Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ext/http): Make http server parameters configurable #26785

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 85 additions & 10 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use std::io;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
use std::time::Duration;

use super::fly_accept_encoding;
use fly_accept_encoding::Encoding;
Expand Down Expand Up @@ -165,6 +166,25 @@ pub enum HttpNextError {
UpgradeUnavailable(#[from] crate::service::UpgradeUnavailableError),
}

/// A set of configuration parameters for HTTP/2.
/// If a field is `None`, the default value from the hyper crate will be used.
/// The default values may change in future versions.
#[derive(Default, Clone, Copy)]
pub struct Http2Config {
pub max_pending_accept_reset_streams: Option<usize>,
pub max_local_error_reset_streams: Option<usize>,
pub initial_stream_window_size: Option<u32>,
pub initial_connection_window_size: Option<u32>,
pub adaptive_window: Option<bool>,
pub max_frame_size: Option<u32>,
pub max_concurrent_streams: Option<u32>,
pub keep_alive_interval: Option<Duration>,
pub keep_alive_timeout: Option<Duration>,
pub max_send_buf_size: Option<usize>,
pub max_header_list_size: Option<u32>,
pub auto_date_header: Option<bool>,
}

#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
Expand Down Expand Up @@ -829,9 +849,48 @@ fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
h2_config: Http2Config,
magurotuna marked this conversation as resolved.
Show resolved Hide resolved
) -> impl Future<Output = Result<(), hyper::Error>> + 'static {
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
let mut builder = http2::Builder::new(LocalExecutor);

if let Some(v) = h2_config.max_pending_accept_reset_streams {
builder.max_pending_accept_reset_streams(v);
}
if let Some(v) = h2_config.max_local_error_reset_streams {
builder = builder.max_local_error_reset_streams(v);
}
if let Some(v) = h2_config.initial_stream_window_size {
builder.initial_stream_window_size(v);
}
if let Some(v) = h2_config.initial_connection_window_size {
builder.initial_connection_window_size(v);
}
if let Some(v) = h2_config.adaptive_window {
builder.adaptive_window(v);
}
if let Some(v) = h2_config.max_frame_size {
builder.max_frame_size(v);
}
if let Some(v) = h2_config.max_concurrent_streams {
builder.max_concurrent_streams(v);
}
if let Some(v) = h2_config.keep_alive_interval {
builder.keep_alive_interval(v);
}
if let Some(v) = h2_config.keep_alive_timeout {
builder.keep_alive_timeout(v);
}
if let Some(v) = h2_config.max_send_buf_size {
builder.max_send_buf_size(v);
}
if let Some(v) = h2_config.max_header_list_size {
builder.max_header_list_size(v);
}
if let Some(v) = h2_config.auto_date_header {
builder.auto_date_header(v);
}

let conn = builder.serve_connection(TokioIo::new(io), svc);
async {
match conn.or_abort(cancel).await {
Err(mut conn) => {
Expand All @@ -847,11 +906,12 @@ async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
h2_config: Http2Config,
) -> Result<(), HttpNextError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
serve_http2_unconditional(io, svc, cancel)
serve_http2_unconditional(io, svc, cancel, h2_config)
.await
.map_err(HttpNextError::Hyper)
} else {
Expand All @@ -866,6 +926,7 @@ fn serve_https(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
h2_config: Http2Config,
) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
Expand All @@ -877,21 +938,21 @@ fn serve_https(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
async {
async move {
let handshake = io.handshake().await?;
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
// based on the prefix bytes
let handshake = handshake.alpn;
if Some(TLS_ALPN_HTTP_2) == handshake.as_deref() {
serve_http2_unconditional(io, svc, listen_cancel_handle)
serve_http2_unconditional(io, svc, listen_cancel_handle, h2_config)
.await
.map_err(HttpNextError::Hyper)
} else if Some(TLS_ALPN_HTTP_11) == handshake.as_deref() {
serve_http11_unconditional(io, svc, listen_cancel_handle)
.await
.map_err(HttpNextError::Hyper)
} else {
serve_http2_autodetect(io, svc, listen_cancel_handle).await
serve_http2_autodetect(io, svc, listen_cancel_handle, h2_config).await
}
}
.try_or_cancel(connection_cancel_handle),
Expand All @@ -903,6 +964,7 @@ fn serve_http(
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
h2_config: Http2Config,
) -> JoinHandle<Result<(), HttpNextError>> {
let HttpLifetime {
server_state,
Expand All @@ -914,7 +976,7 @@ fn serve_http(
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
serve_http2_autodetect(io, svc, listen_cancel_handle, h2_config)
.try_or_cancel(connection_cancel_handle),
)
}
Expand All @@ -924,6 +986,7 @@ fn serve_http_on<HTTP>(
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
h2_config: Http2Config,
) -> JoinHandle<Result<(), HttpNextError>>
where
HTTP: HttpPropertyExtractor,
Expand All @@ -935,14 +998,14 @@ where

match network_stream {
NetworkStream::Tcp(conn) => {
serve_http(conn, connection_properties, lifetime, tx)
serve_http(conn, connection_properties, lifetime, tx, h2_config)
}
NetworkStream::Tls(conn) => {
serve_https(conn, connection_properties, lifetime, tx)
serve_https(conn, connection_properties, lifetime, tx, h2_config)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
serve_http(conn, connection_properties, lifetime, tx)
serve_http(conn, connection_properties, lifetime, tx, h2_config)
}
}
}
Expand Down Expand Up @@ -1031,6 +1094,11 @@ where

let lifetime = resource.lifetime();

let h2_config = {
let state = state.borrow();
*state.borrow::<Http2Config>()
};

let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
Expand All @@ -1043,6 +1111,7 @@ where
&listen_properties_clone,
lifetime.clone(),
tx.clone(),
h2_config,
);
}
#[allow(unreachable_code)]
Expand Down Expand Up @@ -1079,11 +1148,17 @@ where
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));

let h2_config = {
let state = state.borrow();
*state.borrow::<Http2Config>()
};

let handle = serve_http_on::<HTTP>(
connection,
&listen_properties,
resource.lifetime(),
tx,
h2_config,
);

// Set the handle after we start the future
Expand Down
7 changes: 7 additions & 0 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod service;
mod websocket_upgrade;

use fly_accept_encoding::Encoding;
pub use http_next::Http2Config;
pub use http_next::HttpNextError;
pub use request_properties::DefaultHttpPropertyExtractor;
pub use request_properties::HttpConnectionProperties;
Expand Down Expand Up @@ -134,6 +135,12 @@ deno_core::extension!(
http_next::op_http_cancel,
],
esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
options = {
h2_config: Http2Config,
},
state = |state, options| {
state.put(options.h2_config);
}
);

#[derive(Debug, thiserror::Error)]
Expand Down
4 changes: 3 additions & 1 deletion runtime/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,9 @@ pub fn create_runtime_snapshot(
deno_cron::local::LocalCronHandler::new(),
),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(
deno_http::Http2Config::default(),
),
deno_io::deno_io::init_ops_and_esm(Default::default()),
deno_fs::deno_fs::init_ops_and_esm::<Permissions>(fs.clone()),
deno_node::deno_node::init_ops_and_esm::<Permissions>(None, fs.clone()),
Expand Down
4 changes: 3 additions & 1 deletion runtime/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,9 @@ impl WebWorker {
),
deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(
deno_http::Http2Config::default(),
),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
deno_fs::deno_fs::init_ops_and_esm::<PermissionsContainer>(
services.fs.clone(),
Expand Down
4 changes: 3 additions & 1 deletion runtime/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ impl MainWorker {
),
deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(
deno_http::Http2Config::default(),
),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
deno_fs::deno_fs::init_ops_and_esm::<PermissionsContainer>(
services.fs.clone(),
Expand Down