From e548b8b27695e234a4f672049159fdcfc44b3084 Mon Sep 17 00:00:00 2001 From: Aaron Arinder Date: Mon, 24 Nov 2025 15:20:24 -0500 Subject: [PATCH] feat(http2): allow header config for tcp/uds --- ...at_http2_header_size_limits_for_tcp_uds.md | 5 + apollo-router/src/axum_factory/listeners.rs | 142 ++++----- apollo-router/tests/common.rs | 20 +- .../fixtures/tcp.header_limited.router.yml | 7 + .../fixtures/unix.header_limited.router.yml | 7 + .../tests/integration/http_server.rs | 290 ++++++++++++++++++ 6 files changed, 395 insertions(+), 76 deletions(-) create mode 100644 .changesets/feat_http2_header_size_limits_for_tcp_uds.md create mode 100644 apollo-router/tests/integration/fixtures/tcp.header_limited.router.yml create mode 100644 apollo-router/tests/integration/fixtures/unix.header_limited.router.yml diff --git a/.changesets/feat_http2_header_size_limits_for_tcp_uds.md b/.changesets/feat_http2_header_size_limits_for_tcp_uds.md new file mode 100644 index 0000000000..a60cd46147 --- /dev/null +++ b/.changesets/feat_http2_header_size_limits_for_tcp_uds.md @@ -0,0 +1,5 @@ +### Enables HTTP/2 header size limits for TCP and UDS (unix sockets) + +The router config's HTTP/2 header size limit option is now respected by requests using TCP and UDS. Previously it would only work for TLS connections. + +By [@aaronArinder](https://github.com/aaronArinder) in https://github.com/apollographql/router/pull/8673 diff --git a/apollo-router/src/axum_factory/listeners.rs b/apollo-router/src/axum_factory/listeners.rs index 42b753ee7b..8dfb13c4fa 100644 --- a/apollo-router/src/axum_factory/listeners.rs +++ b/apollo-router/src/axum_factory/listeners.rs @@ -9,12 +9,14 @@ use std::time::Duration; use axum::Router; use axum::response::*; +use bytesize::ByteSize; use futures::channel::oneshot; use futures::prelude::*; use hyper_util::rt::TokioExecutor; use hyper_util::rt::TokioIo; use hyper_util::rt::TokioTimer; use hyper_util::server::conn::auto::Builder; +use hyper_util::server::conn::auto::Http1Builder; use multimap::MultiMap; #[cfg(unix)] use tokio::net::UnixListener; @@ -354,6 +356,24 @@ pub(super) fn serve_router_on_listen_addr( let _connection_stop_signal = connection_stop_signal; let connection_handle = ConnectionHandle::new(pipeline_ref, address); + // Development note: the following describes the different network + // streams and how to think about them when modifying the logic + // below. In general, a change to one stream (eg, TLS) should also + // have similar changes to the others unless there's a very good + // reason why it shouldn't be applied more broadly. Any changes to + // how the connections are configured should be centralized in + // configure_connections() + // + // Here are some pratical examples of each network stream below: + // - TCP :: after TLS termination (eg, reverse proxy), dev + // servers, or testing environments; ie, whenever https isn't + // used + // - Unix :: unix sockets, which are used widely in sidecars or + // when commnicating on the same machine, but can also be used + // by reverse proxies + // - TLS :: encrypted requests, often when the router is used to + // expose its graph to the public internet (ie, not behind a proxy) + match res { NetworkStream::Tcp(stream) => { let received_first_request = Arc::new(AtomicBool::new(false)); @@ -374,19 +394,8 @@ pub(super) fn serve_router_on_listen_addr( }); let mut builder = Builder::new(TokioExecutor::new()); - let mut http_connection = builder.http1(); - let http_config = http_connection - .keep_alive(true) - .timer(TokioTimer::new()) - .header_read_timeout(header_read_timeout); - if let Some(max_headers) = opt_max_http1_headers { - http_config.max_headers(max_headers); - } - - if let Some(max_buf_size) = opt_max_http1_buf_size{ - http_config.max_buf_size(max_buf_size.as_u64() as usize); - } - let connection = http_config.serve_connection_with_upgrades(tokio_stream, hyper_service); + let config = configure_connection(&mut builder, header_read_timeout, opt_max_http1_headers, opt_max_http1_buf_size, opt_max_http2_headers_list_bytes); + let connection = config.serve_connection_with_upgrades(tokio_stream, hyper_service); handle_connection!(connection, connection_handle, connection_shutdown, connection_shutdown_timeout, received_first_request); } #[cfg(unix)] @@ -398,19 +407,8 @@ pub(super) fn serve_router_on_listen_addr( app.clone().call(request) }); let mut builder = Builder::new(TokioExecutor::new()); - let mut http_connection = builder.http1(); - let http_config = http_connection - .keep_alive(true) - .timer(TokioTimer::new()) - .header_read_timeout(header_read_timeout); - if let Some(max_headers) = opt_max_http1_headers { - http_config.max_headers(max_headers); - } - - if let Some(max_buf_size) = opt_max_http1_buf_size { - http_config.max_buf_size(max_buf_size.as_u64() as usize); - } - let connection = http_config.serve_connection_with_upgrades(tokio_stream, hyper_service); + let config = configure_connection(&mut builder, header_read_timeout, opt_max_http1_headers, opt_max_http1_buf_size, opt_max_http2_headers_list_bytes); + let connection = config.serve_connection_with_upgrades(tokio_stream, hyper_service); handle_connection!(connection, connection_handle, connection_shutdown, connection_shutdown_timeout, received_first_request); }, NetworkStream::Tls(stream) => { @@ -427,52 +425,11 @@ pub(super) fn serve_router_on_listen_addr( app.clone().call(request) }); - // the builder can toggle between http/2 and http/1, which - // makes for somewhat confusing code below - let mut builder = Builder::new(TokioExecutor::new()); - - // ALPN (application-layer protocol negotiation) is an - // extension of TLS that lets the client/server agree on a - // protocol during the TLS handshake - if stream.get_ref().1.alpn_protocol() == Some(&b"h2"[..]) { - // if we see "h2", we're going to use only HTTP/2 - // WARN: from the docs, this doesn't do anything when - // used with serve_connection_with_upgrades(), which we - // use below - builder = builder.http2_only(); - - } - - let mut http2_config = builder.http2(); - if let Some(max_header_list_size) = opt_max_http2_headers_list_bytes { - // ByteSize as a .as_u64, but not as_u32, to explain - // this funky `as` conversion - http2_config.max_header_list_size(max_header_list_size.as_u64() as u32); - } - - // access the http1 config via the http2 builder doesn't - // erase the http2 config, only extends the overall - // configuration for connections - let mut http1_config = http2_config.http1(); - let http_config = http1_config - .keep_alive(true) - .timer(TokioTimer::new()) - .header_read_timeout(header_read_timeout); - - - if let Some(max_headers) = opt_max_http1_headers { - // NOTE: max headers has no effect on http2 - http_config.max_headers(max_headers); - } - - - if let Some(max_buf_size) = opt_max_http1_buf_size { - http_config.max_buf_size(max_buf_size.as_u64() as usize); - } - - let tokio_stream = TokioIo::new(stream); - let connection = http_config + + let mut builder = Builder::new(TokioExecutor::new()); + let config = configure_connection(&mut builder, header_read_timeout, opt_max_http1_headers, opt_max_http1_buf_size, opt_max_http2_headers_list_bytes); + let connection = config .serve_connection_with_upgrades(tokio_stream, hyper_service); handle_connection!(connection, connection_handle, connection_shutdown, connection_shutdown_timeout, received_first_request); @@ -496,6 +453,49 @@ pub(super) fn serve_router_on_listen_addr( (server, shutdown_sender) } +/// Configures a connection, no matter whether it's TLS, TCP, or UDS (ie, unix sockets) with +/// whatever parameters are configured by the end-user in their router config +/// +/// NOTE: centralize all connection configuration changes here so that the behavior of the +/// connections remains in-step +fn configure_connection( + conn_builder: &mut Builder, + header_read_timeout: Duration, + opt_max_http1_headers: Option, + opt_max_http1_buf_size: Option, + opt_max_http2_headers_list_bytes: Option, +) -> Http1Builder<'_, TokioExecutor> { + // NOTE: this is a builder that auto-detects http1/http2, so we can configure both and rely on + // it to figure out whether we have an http1 or http2 connection + let mut builder = conn_builder.http2(); + if let Some(max_header_list_size) = opt_max_http2_headers_list_bytes { + let max_header_list_size = u32::try_from(max_header_list_size.as_u64()) + .inspect_err(|e| tracing::warn!("attempted to use an HTTP/2 header size limit greater than is allowed by u32 (~4.3gb): {e}")) + .unwrap_or(u32::MAX); + builder.max_header_list_size(max_header_list_size); + } + + let mut builder = conn_builder.http1(); + + builder + .keep_alive(true) + .timer(TokioTimer::new()) + .header_read_timeout(header_read_timeout); + + if let Some(max_headers) = opt_max_http1_headers { + builder.max_headers(max_headers); + } + + if let Some(max_buf_size) = opt_max_http1_buf_size { + let max_buf_size = usize::try_from(max_buf_size.as_u64()) + .inspect_err(|e| tracing::warn!("attempted to use an HTTP/2 header size limit greater than is allowed by an unsized integer (quite large, multiple GBs, but platform-specific): {e}")) + .unwrap_or(usize::MAX); + builder.max_buf_size(max_buf_size); + } + + builder +} + #[derive(Clone)] struct IdleConnectionChecker { received_request: Arc, diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index 27968b9cfd..3a586d4b9e 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -1727,7 +1727,7 @@ fn merge_overrides( } // Override the listening address always since we spawn the router on a - // random port. + // random port. However, don't override Unix socket paths. match config .as_object_mut() .and_then(|o| o.get_mut("supergraph")) @@ -1744,10 +1744,20 @@ fn merge_overrides( } } Some(supergraph_conf) => { - supergraph_conf.insert( - "listen".to_string(), - serde_json::Value::String(bind_addr.to_string()), - ); + // check if the listen address is a Unix socket path (ie, starts with /) + let is_unix_socket = supergraph_conf + .get("listen") + .and_then(|v| v.as_str()) + .map(|s| s.starts_with('/')) + .unwrap_or(false); + + // only override if it's not a Unix socket + if !is_unix_socket { + supergraph_conf.insert( + "listen".to_string(), + serde_json::Value::String(bind_addr.to_string()), + ); + } } } diff --git a/apollo-router/tests/integration/fixtures/tcp.header_limited.router.yml b/apollo-router/tests/integration/fixtures/tcp.header_limited.router.yml new file mode 100644 index 0000000000..f036012891 --- /dev/null +++ b/apollo-router/tests/integration/fixtures/tcp.header_limited.router.yml @@ -0,0 +1,7 @@ +supergraph: + listen: 127.0.0.1:0 + +limits: + http1_max_request_headers: 100 + http1_max_request_buf_size: "16000" + http2_max_headers_list_bytes: "20Mib" diff --git a/apollo-router/tests/integration/fixtures/unix.header_limited.router.yml b/apollo-router/tests/integration/fixtures/unix.header_limited.router.yml new file mode 100644 index 0000000000..5d756ebc00 --- /dev/null +++ b/apollo-router/tests/integration/fixtures/unix.header_limited.router.yml @@ -0,0 +1,7 @@ +supergraph: + listen: /tmp/apollo_router_test_{{RANDOM}}.sock + +limits: + http1_max_request_headers: 100 + http1_max_request_buf_size: "16000" + http2_max_headers_list_bytes: "20Mib" diff --git a/apollo-router/tests/integration/http_server.rs b/apollo-router/tests/integration/http_server.rs index bc4fa3da11..77540b1104 100644 --- a/apollo-router/tests/integration/http_server.rs +++ b/apollo-router/tests/integration/http_server.rs @@ -1,5 +1,10 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + use http::StatusCode; use hyper_util::rt::TokioExecutor; +use rstest::rstest; use rustls::RootCertStore; use tower::BoxError; @@ -11,6 +16,11 @@ const SERVER_CERT: &str = include_str!("../../src/services/http/testdata/server_ const TLS_CONFIG: &str = include_str!("./fixtures/tls.router.yml"); const TLS_CONFIG_WITH_SMALL_H2_HEADER_LIMIT: &str = include_str!("./fixtures/tls.header_limited.router.yml"); +const TCP_CONFIG_WITH_H2_HEADER_LIMIT: &str = + include_str!("./fixtures/tcp.header_limited.router.yml"); +#[cfg(unix)] +const UNIX_CONFIG_WITH_H2_HEADER_LIMIT: &str = + include_str!("./fixtures/unix.header_limited.router.yml"); fn load_cert_to_root_store(cert_pem: &str) -> RootCertStore { let mut root_store = RootCertStore::empty(); @@ -227,3 +237,283 @@ async fn test_http2_max_header_list_size_within_limit() -> Result<(), BoxError> router.graceful_shutdown().await; Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn test_tcp_max_header_list_size_exceeded() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config(TCP_CONFIG_WITH_H2_HEADER_LIMIT) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let tcp_addr = router.bind_address(); + + // Create a custom connector for TCP + let connector = tower::service_fn(move |_uri: http::Uri| { + Box::pin(async move { + let stream = tokio::net::TcpStream::connect(tcp_addr).await?; + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream)) + }) + }); + + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) + .http2_only(true) + .build(connector); + + let uri: http::Uri = format!("http://{}/", tcp_addr).parse()?; + + // much bigger than the config's limit (20MiB)! this also tests that the hyper default (16kb) + // is overridden + let large_header_value = "x".repeat(21 * 1024 * 1024); + + let request = http::Request::builder() + .uri(uri) + .method("POST") + .header("content-type", "application/json") + .header("x-large-header", large_header_value) + .body(http_body_util::Full::new(bytes::Bytes::from( + r#"{"query":"{ __typename }"}"#, + )))?; + + let response = client.request(request).await?; + + assert_eq!( + response.status(), + StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, + "Expected 431 Request Header Fields Too Large when header list exceeds 20MiB limit for TCP" + ); + + router.graceful_shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_tcp_max_header_list_size_within_limit() -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config(TCP_CONFIG_WITH_H2_HEADER_LIMIT) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let tcp_addr = router.bind_address(); + + // Create a custom connector for TCP + let connector = tower::service_fn(move |_uri: http::Uri| { + Box::pin(async move { + let stream = tokio::net::TcpStream::connect(tcp_addr).await?; + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream)) + }) + }); + + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) + .http2_only(true) + .build(connector); + + let uri: http::Uri = format!("http://{}/", tcp_addr).parse()?; + + // create a header value that stays within the 20MiB limit of the config + let acceptable_header_value = "y".repeat(10 * 1024 * 1024); + + let request = http::Request::builder() + .uri(uri) + .method("POST") + .header("content-type", "application/json") + .header("x-medium-header", acceptable_header_value) + .body(http_body_util::Full::new(bytes::Bytes::from( + r#"{"query":"{ __typename }"}"#, + )))?; + + let response = client.request(request).await?; + + assert_eq!( + response.status(), + StatusCode::OK, + "Expected successful response when header list is within 20MiB limit for TCP" + ); + assert_eq!( + response.version(), + http::Version::HTTP_2, + "Expected HTTP/2 to be negotiated for TCP" + ); + + router.graceful_shutdown().await; + Ok(()) +} + +enum HttpProtocol { + Http1, + Http2, +} + +// both http1 and http2 have connection persistence by default; http1 uses keep-alive, but since +// http2 uses a single connection with multiple requests, the headers sent to intermediate servers +// can't be used as connection-specific headers because connections are no longer identifiable with +// a single request; so, for http2 connections default to persistently open and only close when +// explicitly closed by the client or server (via GOAWAY frames, eg) +// +// this happens as the default, so the tests below only test the persistence of connections rather +// than the explicit headers (for http1, eg) to make sure that we haven't broken anything or that +// there wasn't a regression in any of the libraries we use breaking something +#[tokio::test(flavor = "multi_thread")] +#[rstest] +#[case::http1_conn_persistence(HttpProtocol::Http1)] +#[case::http2_conn_persistence(HttpProtocol::Http2)] +async fn test_http1_connection_persistence( + #[case] http_protocol: HttpProtocol, +) -> Result<(), BoxError> { + let mut router = IntegrationTest::builder() + .config( + r#" + supergraph: + listen: localhost:80 + "#, + ) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let addr = router.bind_address(); + + // using an Arc to count connections across async boundaries + let connection_count = Arc::new(AtomicUsize::new(0)); + let connection_count_clone = connection_count.clone(); + + let connector = tower::service_fn(move |uri: http::Uri| { + let connection_count = connection_count_clone.clone(); + Box::pin(async move { + // Increment connection counter each time a new connection is established + connection_count.fetch_add(1, Ordering::SeqCst); + let stream = tokio::net::TcpStream::connect(format!( + "{}:{}", + uri.host().unwrap_or("localhost"), + uri.port_u16().unwrap_or(80) + )) + .await?; + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(stream)) + }) + }); + + let is_http2 = matches!(http_protocol, HttpProtocol::Http2); + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) + .http2_only(is_http2) + .build(connector); + + let uri: http::Uri = format!("http://{}/", addr).parse()?; + + // same client, multiple requests + let num_requests = 5; + for i in 0..num_requests { + let request = http::Request::builder() + .uri(uri.clone()) + .method("POST") + .header("content-type", "application/json") + .body(http_body_util::Full::new(bytes::Bytes::from( + r#"{"query":"{ __typename }"}"#, + )))?; + + let response = client.request(request).await?; + + // keep-alive is the default; so, the header might not be there, but we only care if the + // connection remains open (ie, doesn't contain 'close') + let connection_header = response.headers().get(http::header::CONNECTION); + if let Some(value) = connection_header { + let value_str = value.to_str().unwrap_or(""); + assert!( + !value_str.contains("close"), + "Connection should not be closed, got: {} on request {}", + value_str, + i + 1 + ); + } + } + + // this is the core thing to check for keep-alive: that the number of connections is fewer than + // the number of requests, showing re-use + let total_connections = connection_count.load(Ordering::SeqCst); + assert!( + total_connections < num_requests, + "Expected connection reuse: {} connections should be less than {} requests", + total_connections, + num_requests + ); + + router.graceful_shutdown().await; + Ok(()) +} + +#[cfg(unix)] +mod unix_tests { + use hyper_util::rt::TokioIo; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + #[rstest] + #[case::header_within_limits_of_config(UNIX_CONFIG_WITH_H2_HEADER_LIMIT, "y".repeat(10*1024*1024), StatusCode::OK)] + #[case::header_bigger_than_config(UNIX_CONFIG_WITH_H2_HEADER_LIMIT, "n".repeat(21*1024*1024), StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE)] + async fn test_unix_socket_max_header_list_size( + #[case] config: &str, + #[case] header: String, + #[case] status_code: StatusCode, + ) -> Result<(), BoxError> { + use uuid::Uuid; + + // generate a unique socket path to avoid conflicts + let uuid = Uuid::new_v4().simple().to_string(); + let socket_path = format!("/tmp/apollo_router_test_{}.sock", uuid); + let config = config.replace("{{RANDOM}}", &uuid); + + let mut router = IntegrationTest::builder().config(&config).build().await; + + router.start().await; + router.assert_started().await; + + // connect directly to the Unix socket using HTTP/2 + let stream = tokio::net::UnixStream::connect(&socket_path).await?; + let (mut sender, conn) = + hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(stream)) + .await?; + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + eprintln!("Connection failed: {err:?}"); + } + }); + + let request = http::Request::builder() + .uri("http://localhost/") + .method("POST") + .header("content-type", "application/json") + .header("x-target-header", header) + .body(http_body_util::Full::new(bytes::Bytes::from( + r#"{"query":"{ __typename }"}"#, + )))?; + + let response = sender.send_request(request).await?; + + assert_eq!( + response.status(), + status_code, + "Expected status code {:?} for Unix socket with header size test", + status_code + ); + assert_eq!( + response.version(), + http::Version::HTTP_2, + "Expected HTTP/2 to be negotiated for Unix socket" + ); + + router.graceful_shutdown().await; + + // clean up the socket file + let _ = std::fs::remove_file(&socket_path); + + Ok(()) + } +}