diff --git a/src/body/body.rs b/src/body/body.rs index c31635e26a..339b77d63a 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -12,6 +12,7 @@ use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; use crate::common::{task, watch, Future, Never, Pin, Poll}; +use crate::proto::h2::bdp; use crate::proto::DecodedLength; use crate::upgrade::OnUpgrade; @@ -61,6 +62,11 @@ struct Extra { /// connection yet. delayed_eof: Option, on_upgrade: OnUpgrade, + + /// Records bytes read to compute the BDP. + /// + /// Only used with `H2` variant. + h2_bdp: bdp::Sampler, } type DelayEofUntil = oneshot::Receiver; @@ -175,11 +181,21 @@ impl Body { Body { kind, extra: None } } - pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self { - Body::new(Kind::H2 { + pub(crate) fn h2( + recv: h2::RecvStream, + content_length: DecodedLength, + bdp: bdp::Sampler, + ) -> Self { + let mut body = Body::new(Kind::H2 { content_length, recv, - }) + }); + + if bdp.is_enabled() { + body.extra_mut().h2_bdp = bdp; + } + + body } pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { @@ -204,6 +220,7 @@ impl Body { Box::new(Extra { delayed_eof: None, on_upgrade: OnUpgrade::none(), + h2_bdp: bdp::disabled(), }) }) } @@ -262,6 +279,9 @@ impl Body { Some(Ok(bytes)) => { let _ = h2.flow_control().release_capacity(bytes.len()); len.sub_if(bytes.len() as u64); + if let Some(ref extra) = self.extra { + extra.h2_bdp.sample(bytes.len()); + } Poll::Ready(Some(Ok(bytes))) } Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), diff --git a/src/client/conn.rs b/src/client/conn.rs index a81f1f05ed..f7b24b5268 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -35,12 +35,6 @@ where H2(#[pin] proto::h2::ClientTask), } -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb -const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb - /// Returns a handshake future over some IO. /// /// This is a shortcut for `Builder::new().handshake(io)`. @@ -82,7 +76,7 @@ pub struct Builder { h1_read_buf_exact_size: Option, h1_max_buf_size: Option, http2: bool, - h2_builder: h2::client::Builder, + h2_builder: proto::h2::client::Config, } /// A future returned by `SendRequest::send_request`. @@ -420,12 +414,6 @@ impl Builder { /// Creates a new connection builder. #[inline] pub fn new() -> Builder { - let mut h2_builder = h2::client::Builder::default(); - h2_builder - .initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW) - .initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW) - .enable_push(false); - Builder { exec: Exec::Default, h1_writev: true, @@ -433,7 +421,7 @@ impl Builder { h1_title_case_headers: false, h1_max_buf_size: None, http2: false, - h2_builder, + h2_builder: Default::default(), } } @@ -491,7 +479,8 @@ impl Builder { /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; } self } @@ -506,7 +495,24 @@ impl Builder { sz: impl Into>, ) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_connection_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; } self } diff --git a/src/client/mod.rs b/src/client/mod.rs index ce598d40b3..a9f7ab0666 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -996,6 +996,16 @@ impl Builder { self } + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.conn_builder.http2_adaptive_window(enabled); + self + } + /// Sets the maximum idle connection per host allowed in the pool. /// /// Default is `usize::MAX` (no limit). diff --git a/src/proto/h2/bdp.rs b/src/proto/h2/bdp.rs new file mode 100644 index 0000000000..611eb7acc4 --- /dev/null +++ b/src/proto/h2/bdp.rs @@ -0,0 +1,190 @@ +// What should it do? +// +// # BDP Algorithm +// +// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: +// 1a. Record current time. +// 1b. Send a BDP ping. +// 2. Increment the number of received bytes. +// 3. When the BDP ping ack is received: +// 3a. Record duration from sent time. +// 3b. Merge RTT with a running average. +// 3c. Calculate bdp as bytes/rtt. +// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. +// +// +// # Implementation +// +// - `hyper::Body::h2` variant includes a "bdp channel" +// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())` +// + +use std::sync::{Arc, Mutex, Weak}; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +use h2::{Ping, PingPong}; + +type WindowSize = u32; + +/// Any higher than this likely will be hitting the TCP flow control. +const BDP_LIMIT: usize = 1024 * 1024 * 16; + +pub(crate) fn disabled() -> Sampler { + Sampler { + shared: Weak::new(), + } +} + +pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) { + let shared = Arc::new(Mutex::new(Shared { + bytes: 0, + ping_pong, + ping_sent: false, + sent_at: Instant::now(), + })); + + ( + Sampler { + shared: Arc::downgrade(&shared), + }, + Estimator { + bdp: initial_window, + max_bandwidth: 0.0, + shared, + samples: 0, + rtt: 0.0, + }, + ) +} + +#[derive(Clone)] +pub(crate) struct Sampler { + shared: Weak>, +} + +pub(super) struct Estimator { + shared: Arc>, + + /// Current BDP in bytes + bdp: u32, + /// Largest bandwidth we've seen so far. + max_bandwidth: f64, + /// Count of samples made (ping sent and received) + samples: usize, + /// Round trip time in seconds + rtt: f64, +} + +struct Shared { + bytes: usize, + ping_pong: PingPong, + ping_sent: bool, + sent_at: Instant, +} + +impl Sampler { + pub(crate) fn sample(&self, bytes: usize) { + let shared = if let Some(shared) = self.shared.upgrade() { + shared + } else { + return; + }; + + let mut inner = shared.lock().unwrap(); + + if !inner.ping_sent { + if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) { + inner.ping_sent = true; + inner.sent_at = Instant::now(); + trace!("sending BDP ping"); + } else { + return; + } + } + + inner.bytes += bytes; + } + + pub(crate) fn is_enabled(&self) -> bool { + self.shared.strong_count() > 0 + } +} + +impl Estimator { + pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll { + let mut inner = self.shared.lock().unwrap(); + if !inner.ping_sent { + // XXX: this doesn't register a waker...? + return Poll::Pending; + } + + let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) { + Ok(_pong) => { + let rtt = inner.sent_at.elapsed(); + let bytes = inner.bytes; + inner.bytes = 0; + inner.ping_sent = false; + self.samples += 1; + trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); + (bytes, rtt) + } + Err(e) => { + debug!("bdp pong error: {}", e); + return Poll::Pending; + } + }; + + drop(inner); + + if let Some(bdp) = self.calculate(bytes, rtt) { + Poll::Ready(bdp) + } else { + // XXX: this doesn't register a waker...? + Poll::Pending + } + } + + fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option { + // No need to do any math if we're at the limit. + if self.bdp as usize == BDP_LIMIT { + return None; + } + + // average the rtt + let rtt = seconds(rtt); + if self.samples < 10 { + // Average the first 10 samples + self.rtt += (rtt - self.rtt) / (self.samples as f64); + } else { + self.rtt += (rtt - self.rtt) / 0.9; + } + + // calculate the current bandwidth + let bw = (bytes as f64) / (self.rtt * 1.5); + trace!("current bandwidth = {:.1}B/s", bw); + + if bw < self.max_bandwidth { + // not a faster bandwidth, so don't update + return None; + } else { + self.max_bandwidth = bw; + } + + // if the current `bytes` sample is at least 2/3 the previous + // bdp, increase to double the current sample. + if (bytes as f64) >= (self.bdp as f64) * 0.66 { + self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; + trace!("BDP increased to {}", self.bdp); + Some(self.bdp) + } else { + None + } + } +} + +fn seconds(dur: Duration) -> f64 { + const NANOS_PER_SEC: f64 = 1_000_000_000.0; + let secs = dur.as_secs() as f64; + secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC +} diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 9b87121e87..0fc00d33ea 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -4,7 +4,7 @@ use futures_util::stream::StreamExt as _; use h2::client::{Builder, SendRequest}; use tokio::io::{AsyncRead, AsyncWrite}; -use super::{decode_content_length, PipeToSendStream, SendBuf}; +use super::{bdp, decode_content_length, PipeToSendStream, SendBuf}; use crate::body::Payload; use crate::common::{task, Exec, Future, Never, Pin, Poll}; use crate::headers; @@ -21,17 +21,43 @@ type ConnDropRef = mpsc::Sender; ///// the "dispatch" task will be notified and can shutdown sooner. type ConnEof = oneshot::Receiver; +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + } + } +} + pub(crate) async fn handshake( io: T, req_rx: ClientRx, - builder: &Builder, + config: &Config, exec: Exec, ) -> crate::Result> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, B: Payload, { - let (h2_tx, conn) = builder + let (h2_tx, mut conn) = Builder::default() + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .enable_push(false) .handshake::<_, SendBuf>(io) .await .map_err(crate::Error::new_h2)?; @@ -49,27 +75,34 @@ where } }); - let conn = conn.map_err(|e| debug!("connection error: {}", e)); + let sampler = if config.adaptive_window { + let (sampler, mut estimator) = + bdp::channel(conn.ping_pong().unwrap(), config.initial_stream_window_size); - let conn_task = async move { - match future::select(conn, conn_drop_rx).await { - Either::Left(_) => { - // ok or err, the `conn` has finished - } - Either::Right(((), conn)) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - drop(cancel_tx); - let _ = conn.await; + let conn = future::poll_fn(move |cx| { + match estimator.poll_estimate(cx) { + Poll::Ready(wnd) => { + conn.set_target_window_size(wnd); + conn.set_initial_window_size(wnd)?; + } + Poll::Pending => {} } - } - }; - exec.execute(conn_task); + Pin::new(&mut conn).poll(cx) + }); + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + sampler + } else { + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + bdp::disabled() + }; Ok(ClientTask { + bdp: sampler, conn_drop_ref, conn_eof, executor: exec, @@ -78,10 +111,31 @@ where }) } +async fn conn_task(conn: C, drop_rx: D, cancel_tx: oneshot::Sender) +where + C: Future + Unpin, + D: Future + Unpin, +{ + match future::select(conn, drop_rx).await { + Either::Left(_) => { + // ok or err, the `conn` has finished + } + Either::Right(((), conn)) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + drop(cancel_tx); + let _ = conn.await; + } + } +} + pub(crate) struct ClientTask where B: Payload, { + bdp: bdp::Sampler, conn_drop_ref: ConnDropRef, conn_eof: ConnEof, executor: Exec, @@ -156,10 +210,12 @@ where } } + let bdp = self.bdp.clone(); let fut = fut.map(move |result| match result { Ok(res) => { let content_length = decode_content_length(res.headers()); - let res = res.map(|stream| crate::Body::h2(stream, content_length)); + let res = + res.map(|stream| crate::Body::h2(stream, content_length, bdp)); Ok(res) } Err(err) => { diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index eab57be297..80d52349c7 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -12,12 +12,16 @@ use crate::body::Payload; use crate::common::{task, Future, Pin, Poll}; use crate::headers::content_length_parse_all; +pub(crate) mod bdp; pub(crate) mod client; pub(crate) mod server; pub(crate) use self::client::ClientTask; pub(crate) use self::server::Server; +/// Default initial stream window size defined in HTTP2 spec. +pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; + fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { // List of connection headers from: // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 84689b3d46..b8d1afb925 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,12 +1,12 @@ use std::error::Error as StdError; use std::marker::Unpin; -use h2::server::{Builder, Connection, Handshake, SendResponse}; +use h2::server::{Connection, Handshake, SendResponse}; use h2::Reason; use pin_project::{pin_project, project}; use tokio::io::{AsyncRead, AsyncWrite}; -use super::{decode_content_length, PipeToSendStream, SendBuf}; +use super::{bdp, decode_content_length, PipeToSendStream, SendBuf}; use crate::body::Payload; use crate::common::exec::H2Exec; use crate::common::{task, Future, Pin, Poll}; @@ -16,6 +16,34 @@ use crate::service::HttpService; use crate::{Body, Response}; +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +// +// At the same time, a server more often has multiple clients connected, and +// so is more likely to use more resources than a client would. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) max_concurrent_streams: Option, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + max_concurrent_streams: None, + } + } +} + #[pin_project] pub(crate) struct Server where @@ -31,7 +59,13 @@ enum State where B: Payload, { - Handshaking(Handshake>), + Handshaking { + /// If Some, bdp is enabled with the initial size. + /// + /// If None, bdp is disabled. + bdp_initial_size: Option, + hs: Handshake>, + }, Serving(Serving), Closed, } @@ -40,6 +74,7 @@ struct Serving where B: Payload, { + bdp: Option<(bdp::Sampler, bdp::Estimator)>, conn: Connection>, closing: Option, } @@ -52,11 +87,28 @@ where B: Payload, E: H2Exec, { - pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server { + pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server { + let mut builder = h2::server::Builder::default(); + builder + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size); + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } let handshake = builder.handshake(io); + + let bdp = if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }; + Server { exec, - state: State::Handshaking(handshake), + state: State::Handshaking { + bdp_initial_size: bdp, + hs: handshake, + }, service, } } @@ -64,7 +116,7 @@ where pub fn graceful_shutdown(&mut self) { trace!("graceful_shutdown"); match self.state { - State::Handshaking(..) => { + State::Handshaking { .. } => { // fall-through, to replace state with Closed } State::Serving(ref mut srv) => { @@ -95,9 +147,15 @@ where let me = &mut *self; loop { let next = match me.state { - State::Handshaking(ref mut h) => { - let conn = ready!(Pin::new(h).poll(cx).map_err(crate::Error::new_h2))?; + State::Handshaking { + ref mut hs, + ref bdp_initial_size, + } => { + let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; + let bdp = bdp_initial_size + .map(|wnd| bdp::channel(conn.ping_pong().expect("ping_pong"), wnd)); State::Serving(Serving { + bdp, conn, closing: None, }) @@ -135,7 +193,12 @@ where { if self.closing.is_none() { loop { - // At first, polls the readiness of supplied service. + self.poll_bdp(cx); + + // Check that the service is ready to accept a new request. + // + // - If not, just drive the connection some. + // - If ready, try to accept a new request from the connection. match service.poll_ready(cx) { Poll::Ready(Ok(())) => (), Poll::Pending => { @@ -168,7 +231,14 @@ where Some(Ok((req, respond))) => { trace!("incoming request"); let content_length = decode_content_length(req.headers()); - let req = req.map(|stream| crate::Body::h2(stream, content_length)); + let bdp_sampler = self + .bdp + .as_ref() + .map(|bdp| bdp.0.clone()) + .unwrap_or_else(bdp::disabled); + + let req = + req.map(|stream| crate::Body::h2(stream, content_length, bdp_sampler)); let fut = H2Stream::new(service.call(req), respond); exec.execute_h2stream(fut); } @@ -193,6 +263,18 @@ where Poll::Ready(Err(self.closing.take().expect("polled after error"))) } + + fn poll_bdp(&mut self, cx: &mut task::Context<'_>) { + if let Some((_, ref mut estimator)) = self.bdp { + match estimator.poll_estimate(cx) { + Poll::Ready(wnd) => { + self.conn.set_target_window_size(wnd); + let _ = self.conn.set_initial_window_size(wnd); + } + Poll::Pending => {} + } + } + } } #[allow(missing_debug_implementations)] diff --git a/src/server/conn.rs b/src/server/conn.rs index 9e7cec9ff1..889cb6b206 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -36,15 +36,6 @@ pub(super) use self::upgrades::UpgradeableConnection; #[cfg(feature = "tcp")] pub use super::tcp::{AddrIncoming, AddrStream}; -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -// -// At the same time, a server more often has multiple clients connected, and -// so is more likely to use more resources than a client would. -const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024; // 1mb -const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb - /// A lower-level configuration of the HTTP protocol. /// /// This structure is used to configure options for an HTTP server connection. @@ -56,7 +47,7 @@ pub struct Http { exec: E, h1_half_close: bool, h1_writev: bool, - h2_builder: h2::server::Builder, + h2_builder: proto::h2::server::Config, mode: ConnectionMode, keep_alive: bool, max_buf_size: Option, @@ -145,7 +136,7 @@ where #[derive(Clone, Debug)] enum Fallback { - ToHttp2(h2::server::Builder, E), + ToHttp2(proto::h2::server::Config, E), Http1Only, } @@ -188,16 +179,11 @@ impl Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. pub fn new() -> Http { - let mut h2_builder = h2::server::Builder::default(); - h2_builder - .initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW) - .initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW); - Http { exec: Exec::Default, h1_half_close: false, h1_writev: true, - h2_builder, + h2_builder: Default::default(), mode: ConnectionMode::Fallback, keep_alive: true, max_buf_size: None, @@ -268,7 +254,8 @@ impl Http { /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; } self } @@ -283,7 +270,24 @@ impl Http { sz: impl Into>, ) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_connection_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; } self } @@ -295,9 +299,7 @@ impl Http { /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - if let Some(max) = max.into() { - self.h2_builder.max_concurrent_streams(max); - } + self.h2_builder.max_concurrent_streams = max.into(); self } diff --git a/src/server/mod.rs b/src/server/mod.rs index 6479278764..c6a16a211b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -322,6 +322,16 @@ impl Builder { self } + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(mut self, enabled: bool) -> Self { + self.protocol.http2_adaptive_window(enabled); + self + } + /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 /// connections. ///