diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 0e53641a66..25f04fb6ad 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -70,11 +70,11 @@ use crate::http_server_factory::HttpServerFactory; use crate::http_server_factory::HttpServerHandle; use crate::json_ext::Path; use crate::metrics::FutureMetricsExt; -use crate::plugins::content_negotiation::MULTIPART_DEFER_ACCEPT_HEADER_VALUE; -use crate::plugins::content_negotiation::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE; use crate::plugins::healthcheck::Config as HealthCheck; use crate::router_factory::Endpoint; use crate::router_factory::RouterFactory; +use crate::services::MULTIPART_DEFER_ACCEPT; +use crate::services::MULTIPART_DEFER_CONTENT_TYPE; use crate::services::RouterRequest; use crate::services::RouterResponse; use crate::services::SupergraphResponse; @@ -1723,7 +1723,7 @@ async fn deferred_response_shape() -> Result<(), ApolloRouterError> { let mut response = client .post(&url) .body(query.to_string()) - .header(ACCEPT, MULTIPART_DEFER_ACCEPT_HEADER_VALUE) + .header(ACCEPT, HeaderValue::from_static(MULTIPART_DEFER_ACCEPT)) .send() .await .unwrap(); @@ -1731,7 +1731,7 @@ async fn deferred_response_shape() -> Result<(), ApolloRouterError> { assert_eq!(response.status(), StatusCode::OK); assert_eq!( response.headers().get(CONTENT_TYPE), - Some(&MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE) + Some(&HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE)) ); let first = response.chunk().await.unwrap().unwrap(); @@ -1783,7 +1783,7 @@ async fn multipart_response_shape_with_one_chunk() -> Result<(), ApolloRouterErr let mut response = client .post(&url) .body(query.to_string()) - .header(ACCEPT, MULTIPART_DEFER_ACCEPT_HEADER_VALUE) + .header(ACCEPT, HeaderValue::from_static(MULTIPART_DEFER_ACCEPT)) .send() .await .unwrap(); @@ -1791,7 +1791,7 @@ async fn multipart_response_shape_with_one_chunk() -> Result<(), ApolloRouterErr assert_eq!(response.status(), StatusCode::OK); assert_eq!( response.headers().get(CONTENT_TYPE), - Some(&MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE) + Some(&HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE)) ); let first = response.chunk().await.unwrap().unwrap(); diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 32e5517adf..b43ac855bc 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -1342,8 +1342,8 @@ expression: "&schema" "description": "Telemetry configuration", "properties": { "apollo": { - "$ref": "#/definitions/Config14", - "description": "#/definitions/Config14" + "$ref": "#/definitions/Config13", + "description": "#/definitions/Config13" }, "exporters": { "$ref": "#/definitions/Exporters", @@ -1382,30 +1382,6 @@ expression: "&schema" "type": "object" }, "Config10": { - "additionalProperties": false, - "description": "Configuration for header propagation", - "properties": { - "all": { - "$ref": "#/definitions/HeadersLocation", - "description": "#/definitions/HeadersLocation", - "nullable": true - }, - "connector": { - "$ref": "#/definitions/ConnectorHeadersConfiguration", - "description": "#/definitions/ConnectorHeadersConfiguration" - }, - "subgraphs": { - "additionalProperties": { - "$ref": "#/definitions/HeadersLocation", - "description": "#/definitions/HeadersLocation" - }, - "description": "Rules to specific subgraphs", - "type": "object" - } - }, - "type": "object" - }, - "Config11": { "additionalProperties": false, "description": "Configuration for exposing errors that originate from subgraphs", "properties": { @@ -1425,7 +1401,7 @@ expression: "&schema" }, "type": "object" }, - "Config12": { + "Config11": { "additionalProperties": false, "description": "Configuration for entity caching", "properties": { @@ -1458,11 +1434,11 @@ expression: "&schema" ], "type": "object" }, - "Config13": { + "Config12": { "description": "Configuration for the progressive override plugin", "type": "object" }, - "Config14": { + "Config13": { "additionalProperties": false, "properties": { "batch_processor": { @@ -1538,7 +1514,7 @@ expression: "&schema" }, "type": "object" }, - "Config15": { + "Config14": { "additionalProperties": false, "properties": { "batch_processor": { @@ -1577,7 +1553,7 @@ expression: "&schema" ], "type": "object" }, - "Config16": { + "Config15": { "additionalProperties": false, "description": "Prometheus configuration", "properties": { @@ -1602,7 +1578,7 @@ expression: "&schema" }, "type": "object" }, - "Config17": { + "Config16": { "additionalProperties": false, "properties": { "batch_processor": { @@ -1623,7 +1599,7 @@ expression: "&schema" ], "type": "object" }, - "Config18": { + "Config17": { "additionalProperties": false, "properties": { "batch_processor": { @@ -1682,7 +1658,7 @@ expression: "&schema" ], "type": "object" }, - "Config19": { + "Config18": { "additionalProperties": false, "description": "Configuration for the experimental traffic shaping plugin", "properties": { @@ -1868,9 +1844,6 @@ expression: "&schema" "type": "object" }, "Config8": { - "type": "object" - }, - "Config9": { "additionalProperties": false, "description": "Configuration for entity caching", "properties": { @@ -1903,6 +1876,30 @@ expression: "&schema" ], "type": "object" }, + "Config9": { + "additionalProperties": false, + "description": "Configuration for header propagation", + "properties": { + "all": { + "$ref": "#/definitions/HeadersLocation", + "description": "#/definitions/HeadersLocation", + "nullable": true + }, + "connector": { + "$ref": "#/definitions/ConnectorHeadersConfiguration", + "description": "#/definitions/ConnectorHeadersConfiguration" + }, + "subgraphs": { + "additionalProperties": { + "$ref": "#/definitions/HeadersLocation", + "description": "#/definitions/HeadersLocation" + }, + "description": "Rules to specific subgraphs", + "type": "object" + } + }, + "type": "object" + }, "ConnectorAttributes": { "additionalProperties": false, "properties": { @@ -4500,12 +4497,12 @@ expression: "&schema" "description": "#/definitions/MetricsCommon" }, "otlp": { - "$ref": "#/definitions/Config15", - "description": "#/definitions/Config15" + "$ref": "#/definitions/Config14", + "description": "#/definitions/Config14" }, "prometheus": { - "$ref": "#/definitions/Config16", - "description": "#/definitions/Config16" + "$ref": "#/definitions/Config15", + "description": "#/definitions/Config15" } }, "type": "object" @@ -8208,24 +8205,24 @@ expression: "&schema" "description": "#/definitions/TracingCommon" }, "datadog": { - "$ref": "#/definitions/Config18", - "description": "#/definitions/Config18" + "$ref": "#/definitions/Config17", + "description": "#/definitions/Config17" }, "experimental_response_trace_id": { "$ref": "#/definitions/ExposeTraceId", "description": "#/definitions/ExposeTraceId" }, "otlp": { - "$ref": "#/definitions/Config15", - "description": "#/definitions/Config15" + "$ref": "#/definitions/Config14", + "description": "#/definitions/Config14" }, "propagation": { "$ref": "#/definitions/Propagation", "description": "#/definitions/Propagation" }, "zipkin": { - "$ref": "#/definitions/Config17", - "description": "#/definitions/Config17" + "$ref": "#/definitions/Config16", + "description": "#/definitions/Config16" } }, "type": "object" @@ -9342,8 +9339,8 @@ expression: "&schema" "type": "object" }, "^experimental_response_cache$": { - "$ref": "#/definitions/Config9", - "description": "#/definitions/Config9" + "$ref": "#/definitions/Config8", + "description": "#/definitions/Config8" } }, "properties": { @@ -9367,10 +9364,6 @@ expression: "&schema" "$ref": "#/definitions/ConnectorsConfig", "description": "#/definitions/ConnectorsConfig" }, - "content_negotiation": { - "$ref": "#/definitions/Config7", - "description": "#/definitions/Config7" - }, "coprocessor": { "$ref": "#/definitions/Conf4", "description": "#/definitions/Conf4" @@ -9388,8 +9381,8 @@ expression: "&schema" "description": "#/definitions/DemandControlConfig" }, "enhanced_client_awareness": { - "$ref": "#/definitions/Config8", - "description": "#/definitions/Config8" + "$ref": "#/definitions/Config7", + "description": "#/definitions/Config7" }, "experimental_chaos": { "$ref": "#/definitions/Chaos", @@ -9409,8 +9402,8 @@ expression: "&schema" "description": "#/definitions/ForbidMutationsConfig" }, "headers": { - "$ref": "#/definitions/Config10", - "description": "#/definitions/Config10" + "$ref": "#/definitions/Config9", + "description": "#/definitions/Config9" }, "health_check": { "$ref": "#/definitions/Config", @@ -9421,8 +9414,8 @@ expression: "&schema" "description": "#/definitions/Homepage" }, "include_subgraph_errors": { - "$ref": "#/definitions/Config11", - "description": "#/definitions/Config11" + "$ref": "#/definitions/Config10", + "description": "#/definitions/Config10" }, "license_enforcement": { "$ref": "#/definitions/LicenseEnforcementConfig", @@ -9445,16 +9438,16 @@ expression: "&schema" "description": "#/definitions/Plugins" }, "preview_entity_cache": { - "$ref": "#/definitions/Config12", - "description": "#/definitions/Config12" + "$ref": "#/definitions/Config11", + "description": "#/definitions/Config11" }, "preview_file_uploads": { "$ref": "#/definitions/FileUploadsConfig", "description": "#/definitions/FileUploadsConfig" }, "progressive_override": { - "$ref": "#/definitions/Config13", - "description": "#/definitions/Config13" + "$ref": "#/definitions/Config12", + "description": "#/definitions/Config12" }, "rhai": { "$ref": "#/definitions/Conf7", @@ -9485,8 +9478,8 @@ expression: "&schema" "description": "#/definitions/Tls" }, "traffic_shaping": { - "$ref": "#/definitions/Config19", - "description": "#/definitions/Config19" + "$ref": "#/definitions/Config18", + "description": "#/definitions/Config18" } }, "title": "Configuration", diff --git a/apollo-router/src/plugins/authorization/authenticated.rs b/apollo-router/src/plugins/authorization/authenticated.rs index 62bed55c92..97e2d4c3f6 100644 --- a/apollo-router/src/plugins/authorization/authenticated.rs +++ b/apollo-router/src/plugins/authorization/authenticated.rs @@ -538,7 +538,7 @@ mod tests { use crate::plugin::test::MockSubgraph; use crate::plugins::authorization::APOLLO_AUTHENTICATION_JWT_CLAIMS; use crate::plugins::authorization::authenticated::AuthenticatedVisitor; - use crate::plugins::content_negotiation::ClientRequestAccepts; + use crate::services::router::ClientRequestAccepts; use crate::services::supergraph; use crate::spec::query::transform; diff --git a/apollo-router/src/plugins/content_negotiation.rs b/apollo-router/src/plugins/content_negotiation.rs deleted file mode 100644 index d982ca45ba..0000000000 --- a/apollo-router/src/plugins/content_negotiation.rs +++ /dev/null @@ -1,453 +0,0 @@ -//! The content negotiation plugin performs HTTP content negotiation using the `accept` and -//! `content-type` headers, working at the router stage. -use std::ops::ControlFlow; - -use http::HeaderMap; -use http::HeaderValue; -use http::Method; -use http::StatusCode; -use http::header::ACCEPT; -use http::header::CONTENT_TYPE; -use http::header::VARY; -use mediatype::MediaType; -use mediatype::MediaTypeList; -use mediatype::ReadParams; -use schemars::JsonSchema; -use serde::Deserialize; -use tower::BoxError; -use tower::ServiceBuilder; -use tower::ServiceExt; - -use crate::graphql; -use crate::layers::ServiceBuilderExt; -use crate::plugin::Plugin; -use crate::plugin::PluginInit; -use crate::protocols::multipart::ProtocolMode; -use crate::services::router; -use crate::services::router::body::RouterBody; - -register_plugin!("apollo", "content_negotiation", ContentNegotiation); - -const APPLICATION_JSON: &str = "application/json"; -pub(crate) const APPLICATION_GRAPHQL_JSON: &str = "application/graphql-response+json"; - -const ORIGIN_HEADER_VALUE: HeaderValue = HeaderValue::from_static("origin"); - -// set the supported `@defer` specification version to https://github.com/graphql/graphql-spec/pull/742/commits/01d7b98f04810c9a9db4c0e53d3c4d54dbf10b82 -const MULTIPART_DEFER_SPEC_PARAMETER: &str = "deferSpec"; -const MULTIPART_DEFER_SPEC_VALUE: &str = "20220824"; -pub(crate) const MULTIPART_DEFER_ACCEPT_HEADER_VALUE: HeaderValue = - HeaderValue::from_static("multipart/mixed;deferSpec=20220824"); -pub(crate) const MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE: HeaderValue = - HeaderValue::from_static("multipart/mixed;boundary=\"graphql\";deferSpec=20220824"); - -const MULTIPART_SUBSCRIPTION_ACCEPT: &str = "multipart/mixed;subscriptionSpec=1.0"; -const MULTIPART_SUBSCRIPTION_SPEC_PARAMETER: &str = "subscriptionSpec"; -const MULTIPART_SUBSCRIPTION_SPEC_VALUE: &str = "1.0"; -pub(crate) const MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE: HeaderValue = - HeaderValue::from_static("multipart/mixed;boundary=\"graphql\";subscriptionSpec=1.0"); - -/// The `ClientRequestAccepts` struct is effectively a parsed version of a request's `accept` header. -/// -/// Note that multiple values here can be set to true. For example, if the request provides -/// header value `application/json,*/*`, both `json` and `wildcard` in the struct will be set to true. -#[derive(Clone, Default, Debug)] -pub(crate) struct ClientRequestAccepts { - pub(crate) multipart_defer: bool, - pub(crate) multipart_subscription: bool, - pub(crate) json: bool, - pub(crate) wildcard: bool, -} - -impl ClientRequestAccepts { - /// Returns true if any of the struct's members are true, ie the request includes an `accept` - /// value that the router supports. - fn is_valid(&self) -> bool { - self.json || self.wildcard || self.multipart_defer || self.multipart_subscription - } -} - -/// The `ContentNegotiation` plugin provides request and response layers at the router service. -/// -/// # Request -/// The request layer rejects requests that do not have an expected `Content-Type`, or that have an -/// `Accept` header that is not supported by the router. -/// -/// In particular: -/// * the request must be a `GET` or have `CONTENT_TYPE = JSON`, and -/// * the accept header must include `*/*`, one of the JSON/GraphQL MIME types, or one of the -/// multipart types. -/// -/// It will also add a `ClientRequestAccepts` value to the context if the request is valid. -/// -/// # Response -/// The response layer sets the `CONTENT_TYPE` header, using the `ClientRequestAccepts` value from -/// the context (set on the request side of this plugin). It will also set the `VARY` header if it -/// is not present. -/// -/// # Context -/// If the request is valid, this layer adds a [`ClientRequestAccepts`] value to the context. -struct ContentNegotiation {} -#[derive(Debug, Default, Deserialize, JsonSchema)] -struct Config {} - -impl ContentNegotiation { - fn handle_request(request: router::Request) -> ControlFlow { - let valid_content_type_header = request.router_request.method() == Method::GET - || content_type_includes_json(request.router_request.headers()); - if !valid_content_type_header { - return ControlFlow::Break(invalid_content_type_header_response().into()); - } - - let accepts = parse_accept_header(request.router_request.headers()); - if !accepts.is_valid() { - return ControlFlow::Break(invalid_accept_header_response().into()); - } - - request - .context - .extensions() - .with_lock(|lock| lock.insert(accepts)); - ControlFlow::Continue(request) - } - - fn handle_response(mut response: router::Response) -> router::Response { - let ClientRequestAccepts { - multipart_defer: accepts_multipart_defer, - multipart_subscription: accepts_multipart_subscription, - json: accepts_json, - wildcard: accepts_wildcard, - } = response.context.extensions().with_lock(|lock| { - lock.get::() - .cloned() - .unwrap_or_default() - }); - - let headers = response.response.headers_mut(); - process_vary_header(headers); - - let protocol_mode = response - .context - .extensions() - .with_lock(|lock| lock.get::().cloned()); - - let content_type = match protocol_mode { - Some(ProtocolMode::Defer) if accepts_multipart_defer => { - MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE - } - Some(ProtocolMode::Subscription) if accepts_multipart_subscription => { - MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE - } - None if accepts_json || accepts_wildcard => HeaderValue::from_static(APPLICATION_JSON), - _ => { - // XX(@carodewig) this should be unreachable, but provide fallback of APPLICATION_JSON - HeaderValue::from_static(APPLICATION_JSON) - } - }; - headers.insert(CONTENT_TYPE, content_type); - - response - } -} - -#[async_trait::async_trait] -impl Plugin for ContentNegotiation { - type Config = Config; - - async fn new(_init: PluginInit) -> Result - where - Self: Sized, - { - Ok(ContentNegotiation {}) - } - - fn router_service(&self, service: router::BoxService) -> router::BoxService { - ServiceBuilder::new() - .checkpoint(|request: router::Request| Ok(Self::handle_request(request))) - .service(service) - .map_response(Self::handle_response) - .boxed() - } -} - -/// Returns `true` if the media type is either `application/json` or `application/graphql-response+json`. -fn is_json_type(mime: &MediaType) -> bool { - use mediatype::names::APPLICATION; - use mediatype::names::JSON; - let is_json = |mime: &MediaType| mime.subty == JSON; - let is_gql_json = - |mime: &MediaType| mime.subty.as_str() == "graphql-response" && mime.suffix == Some(JSON); - - mime.ty == APPLICATION && (is_json(mime) || is_gql_json(mime)) -} - -/// Returns `true` if the media type is `*/*`. -fn is_wildcard(mime: &MediaType) -> bool { - use mediatype::names::_STAR; - mime.ty == _STAR && mime.subty == _STAR -} - -/// Returns `true` if media type is a multipart defer, ie `multipart/mixed;deferSpec=20220824`. -fn is_multipart_defer(mime: &MediaType) -> bool { - use mediatype::names::MIXED; - use mediatype::names::MULTIPART; - - let Some(parameter) = mediatype::Name::new(MULTIPART_DEFER_SPEC_PARAMETER) else { - return false; - }; - let Some(value) = mediatype::Value::new(MULTIPART_DEFER_SPEC_VALUE) else { - return false; - }; - - mime.ty == MULTIPART && mime.subty == MIXED && mime.get_param(parameter) == Some(value) -} - -/// Returns `true` if media type is a multipart subscription, ie `multipart/mixed;subscriptionSpec=1.0`. -fn is_multipart_subscription(mime: &MediaType) -> bool { - use mediatype::names::MIXED; - use mediatype::names::MULTIPART; - - let Some(parameter) = mediatype::Name::new(MULTIPART_SUBSCRIPTION_SPEC_PARAMETER) else { - return false; - }; - let Some(value) = mediatype::Value::new(MULTIPART_SUBSCRIPTION_SPEC_VALUE) else { - return false; - }; - - mime.ty == MULTIPART && mime.subty == MIXED && mime.get_param(parameter) == Some(value) -} - -/// Returns `true` if the `CONTENT_TYPE` header contains `application/json` or -/// `application/graphql-response+json`. -fn content_type_includes_json(headers: &HeaderMap) -> bool { - headers - .get_all(CONTENT_TYPE) - .iter() - .filter_map(|header| header.to_str().ok()) - .flat_map(MediaTypeList::new) - .any(|mime_result| mime_result.as_ref().is_ok_and(is_json_type)) -} - -/// Builds and returns `ClientRequestAccepts` from the `ACCEPT` content header. -fn parse_accept_header(headers: &HeaderMap) -> ClientRequestAccepts { - let mut accept_header_present = false; - let mut accepts = ClientRequestAccepts::default(); - - headers - .get_all(ACCEPT) - .iter() - .filter_map(|header| { - accept_header_present = true; - header.to_str().ok() - }) - .flat_map(MediaTypeList::new) - .flatten() - .for_each(|mime| { - accepts.json = accepts.json || is_json_type(&mime); - accepts.wildcard = accepts.wildcard || is_wildcard(&mime); - accepts.multipart_defer = accepts.multipart_defer || is_multipart_defer(&mime); - accepts.multipart_subscription = - accepts.multipart_subscription || is_multipart_subscription(&mime); - }); - - if !accept_header_present { - accepts.json = true; - } - - accepts -} - -/// Process the headers to make sure that `VARY` is set correctly. -fn process_vary_header(headers: &mut HeaderMap) { - if headers.get(VARY).is_none() { - // We don't have a VARY header, add one with value "origin" - headers.insert(VARY, ORIGIN_HEADER_VALUE); - } -} - -/// Helper to build a `RouterBody` containing a `graphql::Error` with the provided extension -/// code and message. -fn error_response_body(extension_code: &str, message: String) -> RouterBody { - router::body::from_bytes( - serde_json::json!({ - "errors": [ - graphql::Error::builder() - .message(message) - .extension_code(extension_code) - .build() - ] - }) - .to_string(), - ) -} - -/// Helper to build an HTTP response with a standardized invalid `CONTENT_TYPE` header message. -fn invalid_content_type_header_response() -> http::Response { - let message = format!( - r#"'content-type' header must be one of: {:?} or {:?}"#, - APPLICATION_JSON, APPLICATION_GRAPHQL_JSON, - ); - http::Response::builder() - .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) - .header(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON)) - .body(error_response_body("INVALID_CONTENT_TYPE_HEADER", message)) - .expect("cannot fail") -} - -/// Helper to build an HTTP response with a standardized invalid `ACCEPT` header message. -pub(crate) fn invalid_accept_header_response() -> http::Response { - let message = format!( - r#"'accept' header must be one of: \"*/*\", {:?}, {:?}, {:?} or {:?}"#, - APPLICATION_JSON, - APPLICATION_GRAPHQL_JSON, - MULTIPART_SUBSCRIPTION_ACCEPT, - MULTIPART_DEFER_ACCEPT_HEADER_VALUE - ); - http::Response::builder() - .status(StatusCode::NOT_ACCEPTABLE) - .header(CONTENT_TYPE, HeaderValue::from_static(APPLICATION_JSON)) - .body(error_response_body("INVALID_ACCEPT_HEADER", message)) - .expect("cannot fail") -} - -#[cfg(test)] -mod tests { - use http::HeaderMap; - use http::header::ACCEPT; - use http::header::CONTENT_TYPE; - use http::header::HeaderValue; - use http::header::VARY; - - use super::APPLICATION_GRAPHQL_JSON; - use super::APPLICATION_JSON; - use super::MULTIPART_DEFER_ACCEPT_HEADER_VALUE; - use super::content_type_includes_json; - use super::parse_accept_header; - use super::process_vary_header; - - const VALID_CONTENT_TYPES: [&str; 2] = [APPLICATION_JSON, APPLICATION_GRAPHQL_JSON]; - const INVALID_CONTENT_TYPES: [&str; 3] = ["invalid", "application/invalid", "application/yaml"]; - - #[test] - fn test_content_type_includes_json_handles_valid_content_types() { - for content_type in VALID_CONTENT_TYPES { - let mut headers = HeaderMap::new(); - headers.insert(CONTENT_TYPE, HeaderValue::from_static(content_type)); - assert!(content_type_includes_json(&headers)); - } - } - - #[test] - fn test_content_type_includes_json_handles_invalid_content_types() { - for content_type in INVALID_CONTENT_TYPES { - let mut headers = HeaderMap::new(); - headers.insert(CONTENT_TYPE, HeaderValue::from_static(content_type)); - assert!(!content_type_includes_json(&headers)); - } - } - - #[test] - fn test_content_type_includes_json_can_process_multiple_content_types() { - let mut headers = HeaderMap::new(); - for content_type in INVALID_CONTENT_TYPES { - headers.insert(CONTENT_TYPE, HeaderValue::from_static(content_type)); - } - for content_type in VALID_CONTENT_TYPES { - headers.insert(CONTENT_TYPE, HeaderValue::from_static(content_type)); - } - - assert!(content_type_includes_json(&headers)); - - let mut headers = HeaderMap::new(); - headers.insert( - CONTENT_TYPE, - INVALID_CONTENT_TYPES.join(", ").parse().unwrap(), - ); - headers.insert( - CONTENT_TYPE, - VALID_CONTENT_TYPES.join(", ").parse().unwrap(), - ); - assert!(content_type_includes_json(&headers)); - } - - #[test] - fn test_parse_accept_header_behaves_as_expected() { - let mut default_headers = HeaderMap::new(); - default_headers.insert(ACCEPT, HeaderValue::from_static(VALID_CONTENT_TYPES[0])); - default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.json); - - let mut default_headers = HeaderMap::new(); - default_headers.insert(ACCEPT, HeaderValue::from_static("*/*")); - default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.wildcard); - - let mut default_headers = HeaderMap::new(); - // real life browser example - default_headers.insert(ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8")); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.wildcard); - - let mut default_headers = HeaderMap::new(); - default_headers.insert(ACCEPT, HeaderValue::from_static(APPLICATION_GRAPHQL_JSON)); - default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.json); - - let mut default_headers = HeaderMap::new(); - default_headers.insert(ACCEPT, HeaderValue::from_static(APPLICATION_GRAPHQL_JSON)); - default_headers.append(ACCEPT, MULTIPART_DEFER_ACCEPT_HEADER_VALUE); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.multipart_defer); - - // Multiple accepted types, including one with a parameter we are interested in - let mut default_headers = HeaderMap::new(); - default_headers.insert( - ACCEPT, - HeaderValue::from_static("multipart/mixed;subscriptionSpec=1.0, application/json"), - ); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.multipart_subscription); - - // No accept header present - let default_headers = HeaderMap::new(); - let accepts = parse_accept_header(&default_headers); - assert!(accepts.json); - } - - #[test] - fn it_adds_default_with_value_origin_if_no_vary_header() { - let mut default_headers = HeaderMap::new(); - process_vary_header(&mut default_headers); - let vary_opt = default_headers.get(VARY); - assert!(vary_opt.is_some()); - let vary = vary_opt.expect("has a value"); - assert_eq!(vary, "origin"); - } - - #[test] - fn it_leaves_vary_alone_if_set() { - let mut default_headers = HeaderMap::new(); - default_headers.insert(VARY, HeaderValue::from_static("*")); - process_vary_header(&mut default_headers); - let vary_opt = default_headers.get(VARY); - assert!(vary_opt.is_some()); - let vary = vary_opt.expect("has a value"); - assert_eq!(vary, "*"); - } - - #[test] - fn it_leaves_varys_alone_if_there_are_more_than_one() { - let mut default_headers = HeaderMap::new(); - default_headers.insert(VARY, HeaderValue::from_static("one")); - default_headers.append(VARY, HeaderValue::from_static("two")); - process_vary_header(&mut default_headers); - let vary = default_headers.get_all(VARY); - assert_eq!(vary.iter().count(), 2); - for value in vary { - assert!(value == "one" || value == "two"); - } - } -} diff --git a/apollo-router/src/plugins/mod.rs b/apollo-router/src/plugins/mod.rs index dbfb642d2e..2c17969e29 100644 --- a/apollo-router/src/plugins/mod.rs +++ b/apollo-router/src/plugins/mod.rs @@ -24,7 +24,6 @@ pub(crate) mod authentication; pub(crate) mod authorization; pub(crate) mod cache; pub(crate) mod connectors; -pub(crate) mod content_negotiation; mod coprocessor; pub(crate) mod csrf; pub(crate) mod demand_control; diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 661ca57fd3..1386348cf8 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -766,7 +766,6 @@ pub(crate) async fn create_plugins( add_optional_apollo_plugin!("experimental_response_cache"); add_mandatory_apollo_plugin!("progressive_override"); add_optional_apollo_plugin!("demand_control"); - add_mandatory_apollo_plugin!("content_negotiation"); // has to follow file_uploads // This relative ordering is documented in `docs/source/customizations/native.mdx`: add_optional_apollo_plugin!("connectors"); diff --git a/apollo-router/src/services/layers/apq.rs b/apollo-router/src/services/layers/apq.rs index 2d1a2d4556..b8c91a96a5 100644 --- a/apollo-router/src/services/layers/apq.rs +++ b/apollo-router/src/services/layers/apq.rs @@ -252,7 +252,7 @@ mod apq_tests { use crate::Context; use crate::error::Error; use crate::graphql::Response; - use crate::plugins::content_negotiation::ClientRequestAccepts; + use crate::services::router::ClientRequestAccepts; use crate::services::router::service::from_supergraph_mock_callback; use crate::services::router::service::from_supergraph_mock_callback_and_configuration; diff --git a/apollo-router/src/services/layers/content_negotiation.rs b/apollo-router/src/services/layers/content_negotiation.rs new file mode 100644 index 0000000000..84261a5513 --- /dev/null +++ b/apollo-router/src/services/layers/content_negotiation.rs @@ -0,0 +1,316 @@ +//! Layers that do HTTP content negotiation using the Accept and Content-Type headers. +//! +//! Content negotiation uses a pair of layers that work together at the router and supergraph stages. + +use std::ops::ControlFlow; + +use http::HeaderMap; +use http::Method; +use http::StatusCode; +use http::header::ACCEPT; +use http::header::CONTENT_TYPE; +use mediatype::MediaTypeList; +use mediatype::ReadParams; +use mediatype::names::_STAR; +use mediatype::names::APPLICATION; +use mediatype::names::JSON; +use mediatype::names::MIXED; +use mediatype::names::MULTIPART; +use mime::APPLICATION_JSON; +use tower::BoxError; +use tower::Layer; +use tower::Service; +use tower::ServiceExt; + +use crate::graphql; +use crate::layers::ServiceExt as _; +use crate::layers::sync_checkpoint::CheckpointService; +use crate::services::APPLICATION_JSON_HEADER_VALUE; +use crate::services::MULTIPART_DEFER_ACCEPT; +use crate::services::MULTIPART_DEFER_SPEC_PARAMETER; +use crate::services::MULTIPART_DEFER_SPEC_VALUE; +use crate::services::MULTIPART_SUBSCRIPTION_ACCEPT; +use crate::services::MULTIPART_SUBSCRIPTION_SPEC_PARAMETER; +use crate::services::MULTIPART_SUBSCRIPTION_SPEC_VALUE; +use crate::services::router; +use crate::services::router::ClientRequestAccepts; +use crate::services::router::service::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE; +use crate::services::router::service::MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE; +use crate::services::supergraph; + +pub(crate) const GRAPHQL_JSON_RESPONSE_HEADER_VALUE: &str = "application/graphql-response+json"; + +/// A layer for the router service that rejects requests that do not have an expected Content-Type, +/// or that have an Accept header that is not supported by the router. +/// +/// In particular, the Content-Type must be JSON, and the Accept header must include */*, or one of +/// the JSON/GraphQL MIME types. +/// +/// # Context +/// If the request is valid, this layer adds a [`ClientRequestAccepts`] value to the context. +#[derive(Clone, Default)] +pub(crate) struct RouterLayer {} + +impl Layer for RouterLayer +where + S: Service + Send + 'static, + >::Future: Send + 'static, +{ + type Service = CheckpointService; + + fn layer(&self, service: S) -> Self::Service { + CheckpointService::new( + move |req| { + if req.router_request.method() != Method::GET + && !content_type_is_json(req.router_request.headers()) + { + let response = http::Response::builder() + .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .body(router::body::from_bytes( + serde_json::json!({ + "errors": [ + graphql::Error::builder() + .message(format!( + r#"'content-type' header must be one of: {:?} or {:?}"#, + APPLICATION_JSON.essence_str(), + GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + )) + .extension_code("INVALID_CONTENT_TYPE_HEADER") + .build() + ] + }) + .to_string(), + )) + .expect("cannot fail"); + + return Ok(ControlFlow::Break(response.into())); + } + + let accepts = parse_accept(req.router_request.headers()); + + if accepts.wildcard + || accepts.multipart_defer + || accepts.multipart_subscription + || accepts.json + { + req.context + .extensions() + .with_lock(|lock| lock.insert(accepts)); + + Ok(ControlFlow::Continue(req)) + } else { + let response = http::Response::builder() + .status(StatusCode::NOT_ACCEPTABLE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .body(router::body::from_bytes( + serde_json::json!({ + "errors": [ + graphql::Error::builder() + .message(format!( + r#"'accept' header must be one of: \"*/*\", {:?}, {:?}, {:?} or {:?}"#, + APPLICATION_JSON.essence_str(), + GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + MULTIPART_SUBSCRIPTION_ACCEPT, + MULTIPART_DEFER_ACCEPT + )) + .extension_code("INVALID_ACCEPT_HEADER") + .build() + ] + }) + .to_string() + )).expect("cannot fail"); + + Ok(ControlFlow::Break(response.into())) + } + }, + service, + ) + } +} + +/// A layer for the supergraph service that populates the Content-Type response header. +/// +/// The content type is decided based on the [`ClientRequestAccepts`] context value, which is +/// populated by the content negotiation [`RouterLayer`]. +// XXX(@goto-bus-stop): this feels a bit odd. It probably works fine because we can only ever respond +// with JSON, but maybe this should be done as close as possible to where we populate the response body..? +#[derive(Clone, Default)] +pub(crate) struct SupergraphLayer {} + +impl Layer for SupergraphLayer +where + S: Service + + Send + + 'static, + >::Future: Send + 'static, +{ + type Service = supergraph::BoxService; + + fn layer(&self, service: S) -> Self::Service { + service + .map_first_graphql_response(|context, mut parts, res| { + let ClientRequestAccepts { + wildcard: accepts_wildcard, + json: accepts_json, + multipart_defer: accepts_multipart_defer, + multipart_subscription: accepts_multipart_subscription, + } = context.extensions().with_lock(|lock| { + lock.get::() + .cloned() + .unwrap_or_default() + }); + + if !res.has_next.unwrap_or_default() && (accepts_json || accepts_wildcard) { + parts + .headers + .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); + } else if accepts_multipart_defer { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } else if accepts_multipart_subscription { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } + (parts, res) + }) + .boxed() + } +} + +/// Returns true if the headers content type is `application/json` or `application/graphql-response+json` +fn content_type_is_json(headers: &HeaderMap) -> bool { + headers.get_all(CONTENT_TYPE).iter().any(|value| { + value + .to_str() + .map(|accept_str| { + let mut list = MediaTypeList::new(accept_str); + + list.any(|mime| { + mime.as_ref() + .map(|mime| { + (mime.ty == APPLICATION && mime.subty == JSON) + || (mime.ty == APPLICATION + && mime.subty.as_str() == "graphql-response" + && mime.suffix == Some(JSON)) + }) + .unwrap_or(false) + }) + }) + .unwrap_or(false) + }) +} +// Clippy suggests `for mime in MediaTypeList::new(str).flatten()` but less indentation +// does not seem worth making it invisible that Result is involved. +#[allow(clippy::manual_flatten)] +/// Returns (accepts_json, accepts_wildcard, accepts_multipart) +fn parse_accept(headers: &HeaderMap) -> ClientRequestAccepts { + let mut header_present = false; + let mut accepts = ClientRequestAccepts::default(); + for value in headers.get_all(ACCEPT) { + header_present = true; + if let Ok(str) = value.to_str() { + for result in MediaTypeList::new(str) { + if let Ok(mime) = result { + if !accepts.json + && ((mime.ty == APPLICATION && mime.subty == JSON) + || (mime.ty == APPLICATION + && mime.subty.as_str() == "graphql-response" + && mime.suffix == Some(JSON))) + { + accepts.json = true + } + if !accepts.wildcard && (mime.ty == _STAR && mime.subty == _STAR) { + accepts.wildcard = true + } + if !accepts.multipart_defer && (mime.ty == MULTIPART && mime.subty == MIXED) { + let parameter = mediatype::Name::new(MULTIPART_DEFER_SPEC_PARAMETER) + .expect("valid name"); + let value = + mediatype::Value::new(MULTIPART_DEFER_SPEC_VALUE).expect("valid value"); + if mime.get_param(parameter) == Some(value) { + accepts.multipart_defer = true + } + } + if !accepts.multipart_subscription + && (mime.ty == MULTIPART && mime.subty == MIXED) + { + let parameter = mediatype::Name::new(MULTIPART_SUBSCRIPTION_SPEC_PARAMETER) + .expect("valid name"); + let value = mediatype::Value::new(MULTIPART_SUBSCRIPTION_SPEC_VALUE) + .expect("valid value"); + if mime.get_param(parameter) == Some(value) { + accepts.multipart_subscription = true + } + } + } + } + } + } + if !header_present { + accepts.json = true + } + accepts +} + +#[cfg(test)] +mod tests { + use http::HeaderValue; + + use super::*; + + #[test] + fn it_checks_accept_header() { + let mut default_headers = HeaderMap::new(); + default_headers.insert( + ACCEPT, + HeaderValue::from_static(APPLICATION_JSON.essence_str()), + ); + default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); + let accepts = parse_accept(&default_headers); + assert!(accepts.json); + + let mut default_headers = HeaderMap::new(); + default_headers.insert(ACCEPT, HeaderValue::from_static("*/*")); + default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); + let accepts = parse_accept(&default_headers); + assert!(accepts.wildcard); + + let mut default_headers = HeaderMap::new(); + // real life browser example + default_headers.insert(ACCEPT, HeaderValue::from_static("text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8")); + let accepts = parse_accept(&default_headers); + assert!(accepts.wildcard); + + let mut default_headers = HeaderMap::new(); + default_headers.insert( + ACCEPT, + HeaderValue::from_static(GRAPHQL_JSON_RESPONSE_HEADER_VALUE), + ); + default_headers.append(ACCEPT, HeaderValue::from_static("foo/bar")); + let accepts = parse_accept(&default_headers); + assert!(accepts.json); + + let mut default_headers = HeaderMap::new(); + default_headers.insert( + ACCEPT, + HeaderValue::from_static(GRAPHQL_JSON_RESPONSE_HEADER_VALUE), + ); + default_headers.append(ACCEPT, HeaderValue::from_static(MULTIPART_DEFER_ACCEPT)); + let accepts = parse_accept(&default_headers); + assert!(accepts.multipart_defer); + + // Multiple accepted types, including one with a parameter we are interested in + let mut default_headers = HeaderMap::new(); + default_headers.insert( + ACCEPT, + HeaderValue::from_static("multipart/mixed;subscriptionSpec=1.0, application/json"), + ); + let accepts = parse_accept(&default_headers); + assert!(accepts.multipart_subscription); + } +} diff --git a/apollo-router/src/services/layers/mod.rs b/apollo-router/src/services/layers/mod.rs index ce3b6826ac..0741a189bc 100644 --- a/apollo-router/src/services/layers/mod.rs +++ b/apollo-router/src/services/layers/mod.rs @@ -1,6 +1,7 @@ //! Layers that are internal to the execution pipeline. pub(crate) mod allow_only_http_post_mutations; pub(crate) mod apq; +pub(crate) mod content_negotiation; pub(crate) mod persisted_queries; pub(crate) mod query_analysis; pub(crate) mod static_page; diff --git a/apollo-router/src/services/mod.rs b/apollo-router/src/services/mod.rs index 1c555b5fc3..cd4ab3589a 100644 --- a/apollo-router/src/services/mod.rs +++ b/apollo-router/src/services/mod.rs @@ -71,3 +71,16 @@ pub(crate) fn apollo_key() -> Option { pub(crate) fn apollo_graph_reference() -> Option { APOLLO_GRAPH_REF.lock().clone() } + +// set the supported `@defer` specification version to https://github.com/graphql/graphql-spec/pull/742/commits/01d7b98f04810c9a9db4c0e53d3c4d54dbf10b82 +pub(crate) const MULTIPART_DEFER_SPEC_PARAMETER: &str = "deferSpec"; +pub(crate) const MULTIPART_DEFER_SPEC_VALUE: &str = "20220824"; +pub(crate) const MULTIPART_DEFER_ACCEPT: &str = "multipart/mixed;deferSpec=20220824"; +pub(crate) const MULTIPART_DEFER_CONTENT_TYPE: &str = + "multipart/mixed;boundary=\"graphql\";deferSpec=20220824"; + +pub(crate) const MULTIPART_SUBSCRIPTION_ACCEPT: &str = "multipart/mixed;subscriptionSpec=1.0"; +pub(crate) const MULTIPART_SUBSCRIPTION_CONTENT_TYPE: &str = + "multipart/mixed;boundary=\"graphql\";subscriptionSpec=1.0"; +pub(crate) const MULTIPART_SUBSCRIPTION_SPEC_PARAMETER: &str = "subscriptionSpec"; +pub(crate) const MULTIPART_SUBSCRIPTION_SPEC_VALUE: &str = "1.0"; diff --git a/apollo-router/src/services/router.rs b/apollo-router/src/services/router.rs index 05c5f3bb3f..1f8eff6db8 100644 --- a/apollo-router/src/services/router.rs +++ b/apollo-router/src/services/router.rs @@ -23,14 +23,14 @@ use thiserror::Error; use tower::BoxError; use self::body::RouterBody; +use self::service::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE; +use self::service::MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE; use super::supergraph; use crate::Context; use crate::context::CONTAINS_GRAPHQL_ERROR; use crate::graphql; use crate::http_ext::header_map; use crate::json_ext::Path; -use crate::plugins::content_negotiation::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE; -use crate::plugins::content_negotiation::MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE; use crate::plugins::telemetry::config_new::router::events::RouterResponseBodyExtensionType; use crate::services::TryIntoHeaderName; use crate::services::TryIntoHeaderValue; @@ -415,6 +415,14 @@ impl Response { } } +#[derive(Clone, Default, Debug)] +pub(crate) struct ClientRequestAccepts { + pub(crate) multipart_defer: bool, + pub(crate) multipart_subscription: bool, + pub(crate) json: bool, + pub(crate) wildcard: bool, +} + impl From> for Response where T: http_body::Body + Send + 'static, diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 30685d10c9..e291282a26 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -13,11 +13,13 @@ use futures::future::join_all; use futures::future::ready; use futures::stream::StreamExt; use futures::stream::once; +use http::HeaderMap; use http::HeaderName; use http::HeaderValue; use http::Method; use http::StatusCode; use http::header::CONTENT_TYPE; +use http::header::VARY; use http::request::Parts; use mime::APPLICATION_JSON; use multimap::MultiMap; @@ -25,6 +27,7 @@ use opentelemetry::KeyValue; use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; use serde_json_bytes::Value; use tower::BoxError; +use tower::Layer; use tower::ServiceBuilder; use tower::ServiceExt; use tower::buffer::Buffer; @@ -32,6 +35,7 @@ use tower_service::Service; use tracing::Instrument; use super::Body; +use super::ClientRequestAccepts; use crate::Configuration; use crate::Context; use crate::Endpoint; @@ -50,8 +54,6 @@ use crate::metrics::count_operation_error_codes; use crate::metrics::count_operation_errors; #[cfg(test)] use crate::plugin::test::MockSupergraphService; -use crate::plugins::content_negotiation::ClientRequestAccepts; -use crate::plugins::content_negotiation::invalid_accept_header_response; use crate::plugins::telemetry::apollo::Config as ApolloTelemetryConfig; use crate::plugins::telemetry::apollo::ErrorsConfiguration; use crate::plugins::telemetry::config::Conf as TelemetryConfig; @@ -66,14 +68,21 @@ use crate::protocols::multipart::Multipart; use crate::protocols::multipart::ProtocolMode; use crate::query_planner::InMemoryCachePlanner; use crate::router_factory::RouterFactory; +use crate::services::APPLICATION_JSON_HEADER_VALUE; use crate::services::HasPlugins; use crate::services::HasSchema; +use crate::services::MULTIPART_DEFER_ACCEPT; +use crate::services::MULTIPART_DEFER_CONTENT_TYPE; +use crate::services::MULTIPART_SUBSCRIPTION_ACCEPT; +use crate::services::MULTIPART_SUBSCRIPTION_CONTENT_TYPE; use crate::services::RouterRequest; use crate::services::RouterResponse; use crate::services::SupergraphCreator; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; use crate::services::layers::apq::APQLayer; +use crate::services::layers::content_negotiation; +use crate::services::layers::content_negotiation::GRAPHQL_JSON_RESPONSE_HEADER_VALUE; use crate::services::layers::persisted_queries::PersistedQueryLayer; use crate::services::layers::query_analysis::QueryAnalysisLayer; use crate::services::layers::static_page::StaticPageLayer; @@ -84,8 +93,13 @@ use crate::services::router::pipeline_handle::PipelineRef; use crate::services::supergraph; use crate::spec::query::EXTENSIONS_VALUE_COMPLETION_KEY; +pub(crate) static MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE: HeaderValue = + HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE); +pub(crate) static MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE: HeaderValue = + HeaderValue::from_static(MULTIPART_SUBSCRIPTION_CONTENT_TYPE); static ACCEL_BUFFERING_HEADER_NAME: HeaderName = HeaderName::from_static("x-accel-buffering"); static ACCEL_BUFFERING_HEADER_VALUE: HeaderValue = HeaderValue::from_static("no"); +static ORIGIN_HEADER_VALUE: HeaderValue = HeaderValue::from_static("origin"); /// Containing [`Service`] in the request lifecyle. #[derive(Clone)] @@ -277,6 +291,9 @@ impl RouterService { }, }; + // XXX(@goto-bus-stop): *all* of the code using these `accepts_` variables looks like it + // duplicates what the content_negotiation::SupergraphLayer is doing. We should delete one + // or the other, and absolutely not do it inline here. let ClientRequestAccepts { wildcard: accepts_wildcard, json: accepts_json, @@ -288,6 +305,7 @@ impl RouterService { .unwrap_or_default(); let (mut parts, mut body) = response.into_parts(); + process_vary_header(&mut parts.headers); if context .extensions() @@ -335,6 +353,10 @@ impl RouterService { &self.apollo_telemetry_config.errors, ); } + + parts + .headers + .insert(CONTENT_TYPE, APPLICATION_JSON_HEADER_VALUE.clone()); let body: Result = tracing::trace_span!("serialize_response") .in_scope(|| { let body = serde_json::to_string(&response)?; @@ -359,6 +381,18 @@ impl RouterService { .context(context) .build() } else if accepts_multipart_defer || accepts_multipart_subscription { + if accepts_multipart_defer { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } else if accepts_multipart_subscription { + parts.headers.insert( + CONTENT_TYPE, + MULTIPART_SUBSCRIPTION_CONTENT_TYPE_HEADER_VALUE.clone(), + ); + } + if !response.errors.is_empty() { count_operation_errors( &response.errors, @@ -371,33 +405,24 @@ impl RouterService { ACCEL_BUFFERING_HEADER_NAME.clone(), ACCEL_BUFFERING_HEADER_VALUE.clone(), ); - - // NB: here is where we decide what kind of streaming response we're going to - // send. insert it into the extensions so that the content negotiation plugin - // can read it. - let protocol_mode = if matches!(response.subscribed, Some(true)) { - ProtocolMode::Subscription - } else { - ProtocolMode::Defer - }; - context - .extensions() - .with_lock(|lock| lock.insert(protocol_mode)); - - let response_multipart = match protocol_mode { - ProtocolMode::Subscription => Multipart::new(body, protocol_mode), - ProtocolMode::Defer => { - Multipart::new(once(ready(response)).chain(body), protocol_mode) - } + let response = match response.subscribed { + Some(true) => http::Response::from_parts( + parts, + router::body::from_result_stream(Multipart::new( + body, + ProtocolMode::Subscription, + )), + ), + _ => http::Response::from_parts( + parts, + router::body::from_result_stream(Multipart::new( + once(ready(response)).chain(body), + ProtocolMode::Defer, + )), + ), }; - RouterResponse::http_response_builder() - .response(http::Response::from_parts( - parts, - router::body::from_result_stream(response_multipart), - )) - .context(context) - .build() + Ok(RouterResponse { response, context }) } else { count_operation_error_codes( &["INVALID_ACCEPT_HEADER"], @@ -406,7 +431,23 @@ impl RouterService { ); // this should be unreachable due to a previous check, but just to be sure... - Ok(invalid_accept_header_response().into()) + Ok(router::Response::error_builder() + .error( + graphql::Error::builder() + .message(format!( + r#"'accept' header must be one of: \"*/*\", {:?}, {:?}, {:?} or {:?}"#, + APPLICATION_JSON.essence_str(), + GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + MULTIPART_DEFER_ACCEPT, + MULTIPART_SUBSCRIPTION_ACCEPT, + )) + .extension_code("INVALID_ACCEPT_HEADER") + .build(), + ) + .status_code(StatusCode::NOT_ACCEPTABLE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build()?) } } } @@ -775,59 +816,57 @@ impl RouterService { parts: &Parts, body: Body, ) -> Result, bool), TranslateError>, BoxError> { - let graphql_requests: Result<(Vec, bool), TranslateError> = if parts - .method - == Method::GET - { - self.translate_query_request(parts).await - } else { - let bytes = router::body::into_bytes(body) - .instrument(tracing::debug_span!("receive_body")) - .await?; - if let Some(level) = context - .extensions() - .with_lock(|ext| ext.get::().cloned()) - .map(|d| d.0) - { - let mut attrs = Vec::with_capacity(5); - #[cfg(test)] - let mut headers: indexmap::IndexMap = parts - .headers - .clone() - .into_iter() - .filter_map(|(name, val)| Some((name?.to_string(), val))) - .collect(); - #[cfg(test)] - headers.sort_keys(); - #[cfg(not(test))] - let headers = &parts.headers; - - attrs.push(KeyValue::new( - HTTP_REQUEST_HEADERS, - opentelemetry::Value::String(format!("{:?}", headers).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_METHOD, - opentelemetry::Value::String(format!("{}", parts.method).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_URI, - opentelemetry::Value::String(format!("{}", parts.uri).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_VERSION, - opentelemetry::Value::String(format!("{:?}", parts.version).into()), - )); - attrs.push(KeyValue::new( - HTTP_REQUEST_BODY, - opentelemetry::Value::String( - format!("{:?}", String::from_utf8_lossy(&bytes)).into(), - ), - )); - log_event(level, "router.request", attrs, ""); - } - self.translate_bytes_request(&bytes) - }; + let graphql_requests: Result<(Vec, bool), TranslateError> = + if parts.method == Method::GET { + self.translate_query_request(parts).await + } else { + let bytes = router::body::into_bytes(body) + .instrument(tracing::debug_span!("receive_body")) + .await?; + if let Some(level) = context + .extensions() + .with_lock(|ext| ext.get::().cloned()) + .map(|d| d.0) + { + let mut attrs = Vec::with_capacity(5); + #[cfg(test)] + let mut headers: indexmap::IndexMap = parts + .headers + .clone() + .into_iter() + .filter_map(|(name, val)| Some((name?.to_string(), val))) + .collect(); + #[cfg(test)] + headers.sort_keys(); + #[cfg(not(test))] + let headers = &parts.headers; + + attrs.push(KeyValue::new( + HTTP_REQUEST_HEADERS, + opentelemetry::Value::String(format!("{:?}", headers).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_METHOD, + opentelemetry::Value::String(format!("{}", parts.method).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_URI, + opentelemetry::Value::String(format!("{}", parts.uri).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_VERSION, + opentelemetry::Value::String(format!("{:?}", parts.version).into()), + )); + attrs.push(KeyValue::new( + HTTP_REQUEST_BODY, + opentelemetry::Value::String( + format!("{:?}", String::from_utf8_lossy(&bytes)).into(), + ), + )); + log_event(level, "router.request", attrs, ""); + } + self.translate_bytes_request(&bytes) + }; Ok(graphql_requests) } @@ -853,6 +892,14 @@ struct TranslateError { extension_details: String, } +// Process the headers to make sure that `VARY` is set correctly +pub(crate) fn process_vary_header(headers: &mut HeaderMap) { + if headers.get(VARY).is_none() { + // We don't have a VARY header, add one with value "origin" + headers.insert(VARY, ORIGIN_HEADER_VALUE.clone()); + } +} + /// A collection of services and data which may be used to create a "router". #[derive(Clone)] pub(crate) struct RouterCreator { @@ -925,14 +972,14 @@ impl RouterCreator { let config_hash = configuration.hash(); let pipeline_handle = PipelineHandle::new(schema_id, launch_id, config_hash); - let router_service = RouterService::new( + let router_service = content_negotiation::RouterLayer::default().layer(RouterService::new( supergraph_creator.create(), apq_layer, persisted_query_layer, query_analysis_layer, configuration.batching.clone(), TelemetryConfig::apollo(&configuration), - ); + )); // NOTE: This is the start of the router pipeline (router_service) let sb = Buffer::new( diff --git a/apollo-router/src/services/router/tests.rs b/apollo-router/src/services/router/tests.rs index 16b29a4d4b..776197818b 100644 --- a/apollo-router/src/services/router/tests.rs +++ b/apollo-router/src/services/router/tests.rs @@ -1,10 +1,13 @@ use std::sync::Arc; use futures::stream::StreamExt; +use http::HeaderMap; +use http::HeaderValue; use http::Method; use http::Request; use http::Uri; use http::header::CONTENT_TYPE; +use http::header::VARY; use mime::APPLICATION_JSON; use opentelemetry::KeyValue; use parking_lot::Mutex; @@ -19,21 +22,58 @@ use crate::context::OPERATION_NAME; use crate::graphql; use crate::json_ext::Path; use crate::metrics::FutureMetricsExt; -use crate::plugins::content_negotiation::MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE; use crate::plugins::telemetry::CLIENT_NAME; use crate::plugins::telemetry::CLIENT_VERSION; use crate::query_planner::APOLLO_OPERATION_ID; +use crate::services::MULTIPART_DEFER_CONTENT_TYPE; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; use crate::services::router; use crate::services::router::body::RouterBody; use crate::services::router::service::from_supergraph_mock_callback; use crate::services::router::service::from_supergraph_mock_callback_and_configuration; +use crate::services::router::service::process_vary_header; use crate::services::subgraph; use crate::services::supergraph; use crate::spec::query::EXTENSIONS_VALUE_COMPLETION_KEY; use crate::test_harness::make_fake_batch; +// Test Vary processing + +#[test] +fn it_adds_default_with_value_origin_if_no_vary_header() { + let mut default_headers = HeaderMap::new(); + process_vary_header(&mut default_headers); + let vary_opt = default_headers.get(VARY); + assert!(vary_opt.is_some()); + let vary = vary_opt.expect("has a value"); + assert_eq!(vary, "origin"); +} + +#[test] +fn it_leaves_vary_alone_if_set() { + let mut default_headers = HeaderMap::new(); + default_headers.insert(VARY, HeaderValue::from_static("*")); + process_vary_header(&mut default_headers); + let vary_opt = default_headers.get(VARY); + assert!(vary_opt.is_some()); + let vary = vary_opt.expect("has a value"); + assert_eq!(vary, "*"); +} + +#[test] +fn it_leaves_varys_alone_if_there_are_more_than_one() { + let mut default_headers = HeaderMap::new(); + default_headers.insert(VARY, HeaderValue::from_static("one")); + default_headers.append(VARY, HeaderValue::from_static("two")); + process_vary_header(&mut default_headers); + let vary = default_headers.get_all(VARY); + assert_eq!(vary.iter().count(), 2); + for value in vary { + assert!(value == "one" || value == "two"); + } +} + #[tokio::test] async fn it_extracts_query_and_operation_name() { let query = "query"; @@ -404,10 +444,7 @@ async fn it_will_process_a_non_batched_defered_query() { } "; let http_request = supergraph::Request::canned_builder() - .header( - http::header::ACCEPT, - MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE, - ) + .header(http::header::ACCEPT, MULTIPART_DEFER_CONTENT_TYPE) .query(query) .build() .unwrap() @@ -464,10 +501,7 @@ async fn it_will_not_process_a_batched_deferred_query() { "; let http_request = make_fake_batch( supergraph::Request::canned_builder() - .header( - http::header::ACCEPT, - MULTIPART_DEFER_CONTENT_TYPE_HEADER_VALUE, - ) + .header(http::header::ACCEPT, MULTIPART_DEFER_CONTENT_TYPE) .query(query) .build() .unwrap() diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 697d6e665c..c3e1a909d7 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -43,6 +43,7 @@ use uuid::Uuid; use super::Plugins; use super::http::HttpClientServiceFactory; use super::http::HttpRequest; +use super::layers::content_negotiation::GRAPHQL_JSON_RESPONSE_HEADER_VALUE; use super::router::body::RouterBody; use super::subgraph::SubgraphRequestId; use crate::Configuration; @@ -61,7 +62,6 @@ use crate::graphql; use crate::json_ext::Object; use crate::layers::DEFAULT_BUFFER_SIZE; use crate::plugins::authentication::subgraph::SigningParamsConfig; -use crate::plugins::content_negotiation::APPLICATION_GRAPHQL_JSON; use crate::plugins::file_uploads; use crate::plugins::subscription::CallbackMode; use crate::plugins::subscription::SUBSCRIPTION_WS_CUSTOM_CONNECTION_PARAMS; @@ -1461,7 +1461,7 @@ fn get_graphql_content_type(service_name: &str, parts: &Parts) -> Result) -> Result, Infallible> { Ok(http::Response::builder() - .header( - CONTENT_TYPE, - HeaderValue::from_static(APPLICATION_GRAPHQL_JSON), - ) + .header(CONTENT_TYPE, GRAPHQL_JSON_RESPONSE_HEADER_VALUE) .status(StatusCode::UNAUTHORIZED) .body(r#"invalid"#.into()) .unwrap()) @@ -1847,10 +1843,7 @@ mod tests { async fn emulate_subgraph_application_graphql_response(listener: TcpListener) { async fn handle(_request: http::Request) -> Result, Infallible> { Ok(http::Response::builder() - .header( - CONTENT_TYPE, - HeaderValue::from_static(APPLICATION_GRAPHQL_JSON), - ) + .header(CONTENT_TYPE, GRAPHQL_JSON_RESPONSE_HEADER_VALUE) .status(StatusCode::OK) .body(r#"{"data": null}"#.into()) .unwrap()) diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 5049cbfc58..74022ebc39 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -44,7 +44,6 @@ use crate::plugin::DynPlugin; use crate::plugins::authentication::APOLLO_AUTHENTICATION_JWT_CLAIMS; use crate::plugins::connectors::query_plans::store_connectors; use crate::plugins::connectors::query_plans::store_connectors_labels; -use crate::plugins::content_negotiation::ClientRequestAccepts; use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN; use crate::plugins::subscription::Subscription; use crate::plugins::subscription::SubscriptionConfig; @@ -76,10 +75,12 @@ use crate::services::execution::QueryPlan; use crate::services::fetch_service::FetchServiceFactory; use crate::services::http::HttpClientServiceFactory; use crate::services::layers::allow_only_http_post_mutations::AllowOnlyHttpPostMutationsLayer; +use crate::services::layers::content_negotiation; use crate::services::layers::persisted_queries::PersistedQueryLayer; use crate::services::layers::query_analysis::QueryAnalysisLayer; use crate::services::new_service::ServiceFactory; use crate::services::query_planner; +use crate::services::router::ClientRequestAccepts; use crate::services::subgraph::BoxGqlStream; use crate::services::subgraph_service::MakeSubgraphService; use crate::services::supergraph; @@ -983,6 +984,7 @@ impl PluggableSupergraphServiceBuilder { let sb = Buffer::new( ServiceBuilder::new() + .layer(content_negotiation::SupergraphLayer::default()) .service( self.plugins .iter() diff --git a/apollo-router/src/services/supergraph/tests.rs b/apollo-router/src/services/supergraph/tests.rs index e4f8af792e..9c7870283e 100644 --- a/apollo-router/src/services/supergraph/tests.rs +++ b/apollo-router/src/services/supergraph/tests.rs @@ -12,7 +12,7 @@ use crate::Notify; use crate::TestHarness; use crate::graphql; use crate::plugin::test::MockSubgraph; -use crate::plugins::content_negotiation::ClientRequestAccepts; +use crate::services::router::ClientRequestAccepts; use crate::services::subgraph; use crate::services::supergraph; use crate::spec::Schema; diff --git a/apollo-router/tests/integration/content_negotiation.rs b/apollo-router/tests/integration/content_negotiation.rs deleted file mode 100644 index 2a2f66856e..0000000000 --- a/apollo-router/tests/integration/content_negotiation.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::collections::HashMap; - -use http::HeaderValue; -use serde_json::json; -use tower::BoxError; - -use crate::integration::IntegrationTest; -use crate::integration::common::Query; - -#[tokio::test(flavor = "multi_thread")] -async fn test_content_negotiation() -> Result<(), BoxError> { - let mut router = IntegrationTest::builder() - .config("supergraph:") - .build() - .await; - - router.start().await; - router.assert_started().await; - - let query = json!({"query": "{ __typename }"}); - for accept_header in [ - "application/json", - "application/json,multipart/mixed;subscriptionSpec=1.0", - ] { - let (_, response) = router - .execute_query( - Query::builder() - .body(query.clone()) - .headers(HashMap::from([( - "accept".to_string(), - accept_header.to_string(), - )])) - .build(), - ) - .await; - assert_eq!(response.status(), 200); - assert_eq!( - response.headers().get("content-type").unwrap(), - HeaderValue::from_str("application/json").unwrap() - ); - } - - Ok(()) -} diff --git a/apollo-router/tests/integration/mod.rs b/apollo-router/tests/integration/mod.rs index ca6ffc734c..042056b137 100644 --- a/apollo-router/tests/integration/mod.rs +++ b/apollo-router/tests/integration/mod.rs @@ -4,7 +4,6 @@ pub(crate) mod common; pub(crate) use common::IntegrationTest; mod connectors; -mod content_negotiation; mod coprocessor; mod docs; // In the CI environment we only install Redis on x86_64 Linux