diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index ad640703df..fe4ce6a91d 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -59,7 +59,7 @@ use crate::http_server_factory::Listener; use crate::plugins::telemetry::formatters::TRACE_ID_FIELD_NAME; use crate::router::ApolloRouterError; use crate::router_factory::Endpoint; -use crate::router_factory::SupergraphServiceFactory; +use crate::router_factory::TransportServiceFactory; use crate::services::layers::apq::APQLayer; use crate::services::transport; use crate::tracer::TraceId; @@ -95,7 +95,7 @@ pub(crate) fn make_axum_router( apq: APQLayer, ) -> Result where - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { ensure_listenaddrs_consistency(configuration, &endpoints)?; @@ -115,7 +115,8 @@ where async move { Ok(http::Response::builder() - .body(serde_json::to_vec(&health).map_err(BoxError::from)?.into())?) + .body(serde_json::to_vec(&health).map_err(BoxError::from)?.into())? + .into()) } }) .boxed(), @@ -160,7 +161,7 @@ impl HttpServerFactory for AxumHttpServerFactory { extra_endpoints: MultiMap, ) -> Self::Future where - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { Box::pin(async move { let apq = APQLayer::with_cache(DeduplicatingCache::new().await); @@ -292,15 +293,17 @@ fn main_endpoint( apq: APQLayer, ) -> Result where - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { let cors = configuration.cors.clone().into_layer().map_err(|e| { ApolloRouterError::ServiceCreationError(format!("CORS configuration error: {e}").into()) })?; let main_route = main_router::(configuration, apq) + // .layer(my_http_service_stack_except_its_layered_now) .layer(middleware::from_fn(decompress_request_body)) .layer( + // TODO: move it to the telemetry plugin. TraceLayer::new_for_http() .make_span_with(PropagatingMakeSpan::new()) .on_request(|_: &Request<_>, span: &Span| { @@ -344,7 +347,7 @@ where pub(super) fn main_router(configuration: &Configuration, apq: APQLayer) -> axum::Router where - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { let mut graphql_configuration = configuration.supergraph.clone(); if graphql_configuration.path.ends_with("/*") { @@ -391,14 +394,14 @@ where .post({ move |host: Host, uri: OriginalUri, - request: Json, + http_request: Request, Extension(service): Extension, header_map: HeaderMap| { { handle_post( host, uri, - request, + http_request, apq, service.new_service().boxed(), header_map, diff --git a/apollo-router/src/axum_factory/handlers.rs b/apollo-router/src/axum_factory/handlers.rs index 0ea723faf7..10cc60c337 100644 --- a/apollo-router/src/axum_factory/handlers.rs +++ b/apollo-router/src/axum_factory/handlers.rs @@ -36,72 +36,70 @@ use crate::services::layers::apq::APQLayer; use crate::services::MULTIPART_DEFER_CONTENT_TYPE; use crate::SupergraphRequest; use crate::SupergraphResponse; +use crate::TransportRequest; +use crate::TransportResponse; pub(super) async fn handle_get_with_static( static_page: Bytes, Host(host): Host, apq: APQLayer, - service: BoxService, + service: BoxService, http_request: Request, ) -> impl IntoResponse { + // TODO: deal with transport* BEFORE + // TODO: Let users know the http_service is not CORS / CSRF safe if prefers_html(http_request.headers()) { return Html(static_page).into_response(); } - if let Some(request) = http_request - .uri() - .query() - .and_then(|q| graphql::Request::from_urlencoded_query(q.to_string()).ok()) - { - let mut http_request = http_request.map(|_| request); - *http_request.uri_mut() = Uri::from_str(&format!("http://{}{}", host, http_request.uri())) - .expect("the URL is already valid because it comes from axum; qed"); - return run_graphql_request(service, apq, http_request) - .await - .into_response(); - } + return run_graphql_request(service, apq, http_request) + .await + .into_response(); + // if let Some(request) = http_request + // .uri() + // .query() + // .and_then(|q| graphql::Request::from_urlencoded_query(q.to_string()).ok()) + // { + // let mut http_request = http_request.map(|_| request); + // *http_request.uri_mut() = Uri::from_str(&format!("http://{}{}", host, http_request.uri())) + // .expect("the URL is already valid because it comes from axum; qed"); + + // } - (StatusCode::BAD_REQUEST, "Invalid GraphQL request").into_response() + // (StatusCode::BAD_REQUEST, "Invalid GraphQL request").into_response() } pub(super) async fn handle_get( Host(host): Host, apq: APQLayer, - service: BoxService, + service: BoxService, http_request: Request, ) -> impl IntoResponse { - if let Some(request) = http_request - .uri() - .query() - .and_then(|q| graphql::Request::from_urlencoded_query(q.to_string()).ok()) - { - let mut http_request = http_request.map(|_| request); - *http_request.uri_mut() = Uri::from_str(&format!("http://{}{}", host, http_request.uri())) - .expect("the URL is already valid because it comes from axum; qed"); - return run_graphql_request(service, apq, http_request) - .await - .into_response(); - } + return run_graphql_request(service, apq, http_request) + .await + .into_response(); + // if let Some(request) = http_request + // .uri() + // .query() + // .and_then(|q| graphql::Request::from_urlencoded_query(q.to_string()).ok()) + // { + // let mut http_request = http_request.map(|_| request); + // *http_request.uri_mut() = Uri::from_str(&format!("http://{}{}", host, http_request.uri())) + // .expect("the URL is already valid because it comes from axum; qed"); - (StatusCode::BAD_REQUEST, "Invalid Graphql request").into_response() + // } + + // (StatusCode::BAD_REQUEST, "Invalid Graphql request").into_response() } pub(super) async fn handle_post( Host(host): Host, OriginalUri(uri): OriginalUri, - Json(request): Json, + http_request: Request, apq: APQLayer, - service: BoxService, + service: BoxService, header_map: HeaderMap, ) -> impl IntoResponse { - let mut http_request = Request::post( - Uri::from_str(&format!("http://{}{}", host, uri)) - .expect("the URL is already valid because it comes from axum; qed"), - ) - .body(request) - .expect("body has already been parsed; qed"); - *http_request.headers_mut() = header_map; - run_graphql_request(service, apq, http_request) .await .into_response() @@ -110,38 +108,40 @@ pub(super) async fn handle_post( async fn run_graphql_request( service: RS, apq: APQLayer, - http_request: Request, + http_request: Request, ) -> impl IntoResponse where - RS: Service + Send, + RS: Service + Send, { let (head, body) = http_request.into_parts(); - let mut req: SupergraphRequest = Request::from_parts(head, body).into(); - req = match apq.apq_request(req).await { - Ok(req) => req, - Err(res) => { - let (parts, mut stream) = res.response.into_parts(); + let req: TransportRequest = Request::from_parts(head, body).into(); - return match stream.next().await { - None => { - tracing::error!("router service is not available to process request",); - ( - StatusCode::SERVICE_UNAVAILABLE, - "router service is not available to process request", - ) - .into_response() - } - Some(body) => http_ext::Response::from(http::Response::from_parts(parts, body)) - .into_response(), - }; - } - }; + // TODO: Put apq in the right spot + // req = match apq.apq_request(req).await { + // Ok(req) => req, + // Err(res) => { + // let (parts, mut stream) = res.response.into_parts(); + + // return match stream.next().await { + // None => { + // tracing::error!("router service is not available to process request",); + // ( + // StatusCode::SERVICE_UNAVAILABLE, + // "router service is not available to process request", + // ) + // .into_response() + // } + // Some(body) => http_ext::Response::from(http::Response::from_parts(parts, body)) + // .into_response(), + // }; + // } + // }; match service.ready_oneshot().await { Ok(mut service) => { - let accepts_multipart = accepts_multipart(req.supergraph_request.headers()); - let accepts_json = accepts_json(req.supergraph_request.headers()); - let accepts_wildcard = accepts_wildcard(req.supergraph_request.headers()); + let accepts_multipart = accepts_multipart(req.http_request.headers()); + let accepts_json = accepts_json(req.http_request.headers()); + let accepts_wildcard = accepts_wildcard(req.http_request.headers()); match service.call(req).await { Err(e) => { @@ -161,85 +161,87 @@ where .into_response() } Ok(response) => { - let (mut parts, mut stream) = response.response.into_parts(); + response.response.into_response() + // // TODO: Maybe this belongs to http_service + // let (mut parts, mut stream) = response.response.into_parts(); - process_vary_header(&mut parts.headers); + // process_vary_header(&mut parts.headers); - match stream.next().await { - None => { - tracing::error!("router service is not available to process request",); - ( - StatusCode::SERVICE_UNAVAILABLE, - "router service is not available to process request", - ) - .into_response() - } - Some(response) => { - if !response.has_next.unwrap_or(false) - && (accepts_json || accepts_wildcard) - { - parts.headers.insert( - CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ); - tracing::trace_span!("serialize_response").in_scope(|| { - http_ext::Response::from(http::Response::from_parts( - parts, response, - )) - .into_response() - }) - } else if accepts_multipart { - parts.headers.insert( - CONTENT_TYPE, - HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE), - ); + // match stream.next().await { + // None => { + // tracing::error!("router service is not available to process request",); + // ( + // StatusCode::SERVICE_UNAVAILABLE, + // "router service is not available to process request", + // ) + // .into_response() + // } + // Some(response) => { + // if !response.has_next.unwrap_or(false) + // && (accepts_json || accepts_wildcard) + // { + // parts.headers.insert( + // CONTENT_TYPE, + // HeaderValue::from_static("application/json"), + // ); + // tracing::trace_span!("serialize_response").in_scope(|| { + // http_ext::Response::from(http::Response::from_parts( + // parts, response, + // )) + // .into_response() + // }) + // } else if accepts_multipart { + // parts.headers.insert( + // CONTENT_TYPE, + // HeaderValue::from_static(MULTIPART_DEFER_CONTENT_TYPE), + // ); - // each chunk contains a response and the next delimiter, to let client parsers - // know that they can process the response right away - let mut first_buf = Vec::from( - &b"\r\n--graphql\r\ncontent-type: application/json\r\n\r\n"[..], - ); - serde_json::to_writer(&mut first_buf, &response).unwrap(); - if response.has_next.unwrap_or(false) { - first_buf.extend_from_slice(b"\r\n--graphql\r\n"); - } else { - first_buf.extend_from_slice(b"\r\n--graphql--\r\n"); - } + // // each chunk contains a response and the next delimiter, to let client parsers + // // know that they can process the response right away + // let mut first_buf = Vec::from( + // &b"\r\n--graphql\r\ncontent-type: application/json\r\n\r\n"[..], + // ); + // serde_json::to_writer(&mut first_buf, &response).unwrap(); + // if response.has_next.unwrap_or(false) { + // first_buf.extend_from_slice(b"\r\n--graphql\r\n"); + // } else { + // first_buf.extend_from_slice(b"\r\n--graphql--\r\n"); + // } - let body = once(ready(Ok(Bytes::from(first_buf)))).chain( - stream.map(|res| { - let mut buf = Vec::from( - &b"content-type: application/json\r\n\r\n"[..], - ); - serde_json::to_writer(&mut buf, &res).unwrap(); + // let body = once(ready(Ok(Bytes::from(first_buf)))).chain( + // stream.map(|res| { + // let mut buf = Vec::from( + // &b"content-type: application/json\r\n\r\n"[..], + // ); + // serde_json::to_writer(&mut buf, &res).unwrap(); - // the last chunk has a different end delimiter - if res.has_next.unwrap_or(false) { - buf.extend_from_slice(b"\r\n--graphql\r\n"); - } else { - buf.extend_from_slice(b"\r\n--graphql--\r\n"); - } + // // the last chunk has a different end delimiter + // if res.has_next.unwrap_or(false) { + // buf.extend_from_slice(b"\r\n--graphql\r\n"); + // } else { + // buf.extend_from_slice(b"\r\n--graphql--\r\n"); + // } - Ok::<_, BoxError>(buf.into()) - }), - ); + // Ok::<_, BoxError>(buf.into()) + // }), + // ); - (parts, StreamBody::new(body)).into_response() - } else { - // this should be unreachable due to a previous check, but just to be sure... - ( - StatusCode::NOT_ACCEPTABLE, - format!( - r#"'accept' header can't be different than \"*/*\", {:?}, {:?} or {:?}"#, - APPLICATION_JSON_HEADER_VALUE, - GRAPHQL_JSON_RESPONSE_HEADER_VALUE, - MULTIPART_DEFER_CONTENT_TYPE - ), - ) - .into_response() - } - } - } + // (parts, StreamBody::new(body)).into_response() + // } else { + // // this should be unreachable due to a previous check, but just to be sure... + // ( + // StatusCode::NOT_ACCEPTABLE, + // format!( + // r#"'accept' header can't be different than \"*/*\", {:?}, {:?} or {:?}"#, + // APPLICATION_JSON_HEADER_VALUE, + // GRAPHQL_JSON_RESPONSE_HEADER_VALUE, + // MULTIPART_DEFER_CONTENT_TYPE + // ), + // ) + // .into_response() + // } + // } + // } } } } diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 8b41246005..c5e1a53cee 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -56,7 +56,7 @@ use crate::http_server_factory::HttpServerFactory; use crate::http_server_factory::HttpServerHandle; use crate::json_ext::Path; use crate::router_factory::Endpoint; -use crate::router_factory::SupergraphServiceFactory; +use crate::router_factory::TransportServiceFactory; use crate::services::new_service::NewService; use crate::services::transport; use crate::services::SupergraphRequest; @@ -119,11 +119,11 @@ mock! { type MockSupergraphServiceType = tower_test::mock::Mock; #[derive(Clone)] -struct TestSupergraphServiceFactory { +struct TestTransportServiceFactory { inner: MockSupergraphServiceType, } -impl NewService for TestSupergraphServiceFactory { +impl NewService for TestTransportServiceFactory { type Service = MockSupergraphServiceType; fn new_service(&self) -> Self::Service { @@ -131,11 +131,11 @@ impl NewService for TestSupergraphServiceFactory { } } -impl SupergraphServiceFactory for TestSupergraphServiceFactory { +impl TransportServiceFactory for TestTransportServiceFactory { type SupergraphService = MockSupergraphServiceType; type Future = - <>::Service as Service< + <>::Service as Service< SupergraphRequest, >>::Future; @@ -160,7 +160,7 @@ async fn init(mut mock: MockSupergraphService) -> (HttpServerHandle, Client) { }); let server = server_factory .create( - TestSupergraphServiceFactory { + TestTransportServiceFactory { inner: service.into_inner(), }, Arc::new( @@ -221,7 +221,7 @@ pub(super) async fn init_with_config( }); let server = server_factory .create( - TestSupergraphServiceFactory { + TestTransportServiceFactory { inner: service.into_inner(), }, Arc::new(conf), @@ -263,7 +263,7 @@ async fn init_unix( server_factory .create( - TestSupergraphServiceFactory { + TestTransportServiceFactory { inner: service.into_inner(), }, Arc::new( @@ -1662,7 +1662,7 @@ async fn it_supports_server_restart() { let server_factory = AxumHttpServerFactory::new(); let (service, _) = tower_test::mock::spawn(); - let supergraph_service_factory = TestSupergraphServiceFactory { + let supergraph_service_factory = TestTransportServiceFactory { inner: service.into_inner(), }; diff --git a/apollo-router/src/http_server_factory.rs b/apollo-router/src/http_server_factory.rs index 8f185d8d96..a5864ed491 100644 --- a/apollo-router/src/http_server_factory.rs +++ b/apollo-router/src/http_server_factory.rs @@ -11,7 +11,7 @@ use super::router::ApolloRouterError; use crate::configuration::Configuration; use crate::configuration::ListenAddr; use crate::router_factory::Endpoint; -use crate::router_factory::SupergraphServiceFactory; +use crate::router_factory::TransportServiceFactory; /// Factory for creating the http server component. /// @@ -29,7 +29,7 @@ pub(crate) trait HttpServerFactory { extra_endpoints: MultiMap, ) -> Self::Future where - RF: SupergraphServiceFactory; + RF: TransportServiceFactory; } type MainAndExtraListeners = (Listener, Vec<(ListenAddr, Listener)>); @@ -99,7 +99,7 @@ impl HttpServerHandle { ) -> Result where SF: HttpServerFactory, - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { // we tell the currently running server to stop if let Err(_err) = self.shutdown_sender.send(()) { diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index 1a5484e869..7e4ec37ed4 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -171,6 +171,13 @@ pub trait Plugin: Send + Sync + 'static { where Self: Sized; + /// This service runs at the very beginning and very end of the request lifecycle. + /// Define supergraph_service if your customization needs to interact at the earliest or latest point possible. + /// For example, this is a good opportunity to perform JWT verification before allowing a request to proceed further. + fn http_service(&self, service: transport::BoxService) -> transport::BoxService { + service + } + /// This service runs at the very beginning and very end of the request lifecycle. /// Define supergraph_service if your customization needs to interact at the earliest or latest point possible. /// For example, this is a good opportunity to perform JWT verification before allowing a request to proceed further. diff --git a/apollo-router/src/plugins/expose_query_plan.rs b/apollo-router/src/plugins/expose_query_plan.rs index 98ee756b3c..655c4ebf8c 100644 --- a/apollo-router/src/plugins/expose_query_plan.rs +++ b/apollo-router/src/plugins/expose_query_plan.rs @@ -123,7 +123,7 @@ mod tests { use crate::json_ext::Object; use crate::plugin::test::MockSubgraph; use crate::plugin::DynPlugin; - use crate::services::PluggableSupergraphServiceBuilder; + use crate::services::PluggableTransportServiceBuilder; use crate::Schema; static EXPECTED_RESPONSE_WITH_QUERY_PLAN: Lazy = Lazy::new(|| { @@ -178,7 +178,7 @@ mod tests { include_str!("../../../apollo-router-benchmarks/benches/fixtures/supergraph.graphql"); let schema = Arc::new(Schema::parse(schema, &Default::default()).unwrap()); - let builder = PluggableSupergraphServiceBuilder::new(schema.clone()); + let builder = PluggableTransportServiceBuilder::new(schema.clone()); let builder = builder .with_dyn_plugin("experimental.expose_query_plan".to_string(), plugin) .with_subgraph_service("accounts", account_service.clone()) diff --git a/apollo-router/src/plugins/include_subgraph_errors.rs b/apollo-router/src/plugins/include_subgraph_errors.rs index 40b1e8f687..47118c5dd5 100644 --- a/apollo-router/src/plugins/include_subgraph_errors.rs +++ b/apollo-router/src/plugins/include_subgraph_errors.rs @@ -93,7 +93,7 @@ mod test { use crate::json_ext::Object; use crate::plugin::test::MockSubgraph; use crate::plugin::DynPlugin; - use crate::PluggableSupergraphServiceBuilder; + use crate::PluggableTransportServiceBuilder; use crate::Schema; use crate::SupergraphRequest; use crate::SupergraphResponse; @@ -189,7 +189,7 @@ mod test { include_str!("../../../apollo-router-benchmarks/benches/fixtures/supergraph.graphql"); let schema = Arc::new(Schema::parse(schema, &Default::default()).unwrap()); - let builder = PluggableSupergraphServiceBuilder::new(schema.clone()); + let builder = PluggableTransportServiceBuilder::new(schema.clone()); let builder = builder .with_dyn_plugin("apollo.include_subgraph_errors".to_string(), plugin) .with_subgraph_service("accounts", account_service.clone()) diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index be952aaaf1..497014e227 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -379,7 +379,7 @@ mod test { use crate::plugin::test::MockSupergraphService; use crate::plugin::DynPlugin; use crate::Configuration; - use crate::PluggableSupergraphServiceBuilder; + use crate::PluggableTransportServiceBuilder; use crate::Schema; use crate::SupergraphRequest; use crate::SupergraphResponse; @@ -462,7 +462,7 @@ mod test { ) .unwrap(); - let builder = PluggableSupergraphServiceBuilder::new(schema.clone()) + let builder = PluggableTransportServiceBuilder::new(schema.clone()) .with_configuration(Arc::new(config)); let builder = builder diff --git a/apollo-router/src/router.rs b/apollo-router/src/router.rs index 88d583f5c4..e0a992bf1d 100644 --- a/apollo-router/src/router.rs +++ b/apollo-router/src/router.rs @@ -40,9 +40,9 @@ use crate::cache::DeduplicatingCache; use crate::configuration::Configuration; use crate::configuration::ListenAddr; use crate::plugin::DynPlugin; -use crate::router_factory::SupergraphServiceConfigurator; -use crate::router_factory::SupergraphServiceFactory; -use crate::router_factory::YamlSupergraphServiceFactory; +use crate::router_factory::TransportServiceConfigurator; +use crate::router_factory::TransportServiceFactory; +use crate::router_factory::YamlTransportServiceFactory; use crate::services::layers::apq::APQLayer; use crate::services::transport; use crate::spec::Schema; @@ -61,7 +61,7 @@ async fn make_transport_service( extra_plugins: Vec<(String, Box)>, ) -> Result { let schema = Arc::new(Schema::parse(schema, &configuration)?); - let service_factory = YamlSupergraphServiceFactory + let service_factory = YamlTransportServiceFactory .create(configuration.clone(), schema, None, Some(extra_plugins)) .await?; @@ -537,7 +537,7 @@ impl RouterHttpServer { shutdown_receiver, ); let server_factory = AxumHttpServerFactory::new(); - let router_factory = YamlSupergraphServiceFactory::default(); + let router_factory = YamlTransportServiceFactory::default(); let state_machine = StateMachine::new(server_factory, router_factory); let extra_listen_adresses = state_machine.extra_listen_adresses.clone(); let graphql_listen_address = state_machine.graphql_listen_address.clone(); diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 333aae551a..b2551ef4be 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -21,11 +21,11 @@ use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING; use crate::services::new_service::NewService; use crate::services::RouterCreator; use crate::services::SubgraphService; -use crate::services::SupergraphRequest; -use crate::services::SupergraphResponse; +use crate::services::TransportRequest; +use crate::services::TransportResponse; use crate::transport; use crate::ListenAddr; -use crate::PluggableSupergraphServiceBuilder; +use crate::PluggableTransportServiceBuilder; use crate::Schema; #[derive(Clone)] @@ -67,16 +67,16 @@ impl Endpoint { axum::Router::new().route(self.path.as_str(), service_fn(handler)) } } -/// Factory for creating a SupergraphService +/// Factory for creating a TransportService /// /// Instances of this traits are used by the HTTP server to generate a new -/// SupergraphService on each request -pub(crate) trait SupergraphServiceFactory: - NewService + Clone + Send + Sync + 'static +/// TransportService on each request +pub(crate) trait TransportServiceFactory: + NewService + Clone + Send + Sync + 'static { - type SupergraphService: Service< - SupergraphRequest, - Response = SupergraphResponse, + type TransportService: Service< + TransportRequest, + Response = TransportResponse, Error = BoxError, Future = Self::Future, > + Send; @@ -85,42 +85,42 @@ pub(crate) trait SupergraphServiceFactory: fn web_endpoints(&self) -> MultiMap; } -/// Factory for creating a SupergraphServiceFactory +/// Factory for creating a TransportServiceFactory /// /// Instances of this traits are used by the StateMachine to generate a new -/// SupergraphServiceFactory from configuration when it changes +/// TransportServiceFactory from configuration when it changes #[async_trait::async_trait] -pub(crate) trait SupergraphServiceConfigurator: Send + Sync + 'static { - type SupergraphServiceFactory: SupergraphServiceFactory; +pub(crate) trait TransportServiceConfigurator: Send + Sync + 'static { + type TransportServiceFactory: TransportServiceFactory; async fn create<'a>( &'a mut self, configuration: Arc, schema: Arc, - previous_router: Option<&'a Self::SupergraphServiceFactory>, + previous_router: Option<&'a Self::TransportServiceFactory>, extra_plugins: Option)>>, - ) -> Result; + ) -> Result; } -/// Main implementation of the SupergraphService factory, supporting the extensions system +/// Main implementation of the TransportService factory, supporting the extensions system #[derive(Default)] -pub(crate) struct YamlSupergraphServiceFactory; +pub(crate) struct YamlTransportServiceFactory; #[async_trait::async_trait] -impl SupergraphServiceConfigurator for YamlSupergraphServiceFactory { - type SupergraphServiceFactory = RouterCreator; +impl TransportServiceConfigurator for YamlTransportServiceFactory { + type TransportServiceFactory = RouterCreator; async fn create<'a>( &'a mut self, configuration: Arc, schema: Arc, - _previous_router: Option<&'a Self::SupergraphServiceFactory>, + _previous_router: Option<&'a Self::TransportServiceFactory>, extra_plugins: Option)>>, - ) -> Result { + ) -> Result { // Process the plugins. let plugins = create_plugins(&configuration, &schema, extra_plugins).await?; - let mut builder = PluggableSupergraphServiceBuilder::new(schema.clone()); + let mut builder = PluggableTransportServiceBuilder::new(schema.clone()); builder = builder.with_configuration(configuration); for (name, _) in schema.subgraphs() { @@ -156,7 +156,7 @@ pub async fn create_test_service_factory_from_yaml(schema: &str, configuration: let schema: Schema = Schema::parse(schema, &Default::default()).unwrap(); - let service = YamlSupergraphServiceFactory::default() + let service = YamlTransportServiceFactory::default() .create(Arc::new(config), Arc::new(schema), None, None) .await; assert_eq!( @@ -338,8 +338,8 @@ mod test { use crate::plugin::PluginInit; use crate::register_plugin; use crate::router_factory::inject_schema_id; - use crate::router_factory::SupergraphServiceConfigurator; - use crate::router_factory::YamlSupergraphServiceFactory; + use crate::router_factory::TransportServiceConfigurator; + use crate::router_factory::YamlTransportServiceFactory; use crate::Schema; #[derive(Debug)] @@ -455,7 +455,7 @@ mod test { let schema = include_str!("testdata/supergraph.graphql"); let schema = Schema::parse(schema, &config).unwrap(); - let service = YamlSupergraphServiceFactory::default() + let service = YamlTransportServiceFactory::default() .create(Arc::new(config), Arc::new(schema), None, None) .await; service.map(|_| ()) diff --git a/apollo-router/src/services/mod.rs b/apollo-router/src/services/mod.rs index 13502d2316..758775e741 100644 --- a/apollo-router/src/services/mod.rs +++ b/apollo-router/src/services/mod.rs @@ -18,6 +18,8 @@ pub(crate) use crate::services::subgraph::Request as SubgraphRequest; pub(crate) use crate::services::subgraph::Response as SubgraphResponse; pub(crate) use crate::services::supergraph::Request as SupergraphRequest; pub(crate) use crate::services::supergraph::Response as SupergraphResponse; +pub(crate) use crate::services::transport::Request as TransportRequest; +pub(crate) use crate::services::transport::Response as TransportResponse; pub mod execution; mod execution_service; @@ -29,6 +31,7 @@ pub(crate) mod subgraph_service; pub mod supergraph; mod supergraph_service; pub mod transport; +mod transport_service; impl AsRef for http_ext::Request { fn as_ref(&self) -> &Request { diff --git a/apollo-router/src/services/router.rs b/apollo-router/src/services/router.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/apollo-router/src/services/router.rs @@ -0,0 +1 @@ + diff --git a/apollo-router/src/services/supergraph_service.rs b/apollo-router/src/services/supergraph_service.rs index c409359174..5e1f71447a 100644 --- a/apollo-router/src/services/supergraph_service.rs +++ b/apollo-router/src/services/supergraph_service.rs @@ -36,7 +36,7 @@ use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING; use crate::query_planner::BridgeQueryPlanner; use crate::query_planner::CachingQueryPlanner; use crate::router_factory::Endpoint; -use crate::router_factory::SupergraphServiceFactory; +use crate::router_factory::TransportServiceFactory; use crate::services::layers::ensure_query_presence::EnsureQueryPresence; use crate::Configuration; use crate::Context; @@ -252,194 +252,6 @@ async fn plan_query( .await } -/// Builder which generates a plugin pipeline. -/// -/// This is at the heart of the delegation of responsibility model for the router. A schema, -/// collection of plugins, collection of subgraph services are assembled to generate a -/// [`tower::util::BoxCloneService`] capable of processing a router request -/// through the entire stack to return a response. -pub(crate) struct PluggableSupergraphServiceBuilder { - schema: Arc, - plugins: Plugins, - subgraph_services: Vec<(String, Arc)>, - configuration: Option>, -} - -impl PluggableSupergraphServiceBuilder { - pub(crate) fn new(schema: Arc) -> Self { - Self { - schema, - plugins: Default::default(), - subgraph_services: Default::default(), - configuration: None, - } - } - - pub(crate) fn with_dyn_plugin( - mut self, - plugin_name: String, - plugin: Box, - ) -> PluggableSupergraphServiceBuilder { - self.plugins.insert(plugin_name, plugin); - self - } - - pub(crate) fn with_subgraph_service( - mut self, - name: &str, - service_maker: S, - ) -> PluggableSupergraphServiceBuilder - where - S: MakeSubgraphService, - { - self.subgraph_services - .push((name.to_string(), Arc::new(service_maker))); - self - } - - pub(crate) fn with_configuration( - mut self, - configuration: Arc, - ) -> PluggableSupergraphServiceBuilder { - self.configuration = Some(configuration); - self - } - - pub(crate) async fn build(self) -> Result { - // Note: The plugins are always applied in reverse, so that the - // fold is applied in the correct sequence. We could reverse - // the list of plugins, but we want them back in the original - // order at the end of this function. Instead, we reverse the - // various iterators that we create for folding and leave - // the plugins in their original order. - - let configuration = self.configuration.unwrap_or_default(); - - let plan_cache_limit = std::env::var("ROUTER_PLAN_CACHE_LIMIT") - .ok() - .and_then(|x| x.parse().ok()) - .unwrap_or(100); - let redis_urls = configuration.supergraph.cache(); - - let introspection = if configuration.supergraph.introspection { - Some(Arc::new(Introspection::new(&configuration).await)) - } else { - None - }; - - // QueryPlannerService takes an UnplannedRequest and outputs PlannedRequest - let bridge_query_planner = - BridgeQueryPlanner::new(self.schema.clone(), introspection, configuration) - .await - .map_err(ServiceBuildError::QueryPlannerError)?; - let query_planner_service = CachingQueryPlanner::new( - bridge_query_planner, - plan_cache_limit, - self.schema.schema_id.clone(), - redis_urls, - ) - .await; - - let plugins = Arc::new(self.plugins); - - let subgraph_creator = Arc::new(SubgraphCreator::new( - self.subgraph_services, - plugins.clone(), - )); - - Ok(RouterCreator { - query_planner_service, - subgraph_creator, - schema: self.schema, - plugins, - }) - } -} - -/// A collection of services and data which may be used to create a "router". -#[derive(Clone)] -pub(crate) struct RouterCreator { - query_planner_service: CachingQueryPlanner, - subgraph_creator: Arc, - schema: Arc, - plugins: Arc, -} - -impl NewService for RouterCreator { - type Service = BoxService; - fn new_service(&self) -> Self::Service { - self.make().boxed() - } -} - -impl SupergraphServiceFactory for RouterCreator { - type SupergraphService = BoxService; - - type Future = <>::Service as Service< - SupergraphRequest, - >>::Future; - - fn web_endpoints(&self) -> MultiMap { - let mut mm = MultiMap::new(); - self.plugins - .values() - .for_each(|p| mm.extend(p.web_endpoints())); - mm - } -} - -impl RouterCreator { - pub(crate) fn make( - &self, - ) -> impl Service< - SupergraphRequest, - Response = SupergraphResponse, - Error = BoxError, - Future = BoxFuture<'static, Result>, - > + Send { - let supergraph_service = SupergraphService::builder() - .query_planner_service(self.query_planner_service.clone()) - .execution_service_factory(ExecutionCreator { - schema: self.schema.clone(), - plugins: self.plugins.clone(), - subgraph_creator: self.subgraph_creator.clone(), - }) - .schema(self.schema.clone()) - .build(); - - let supergraph_service = match self - .plugins - .iter() - .find(|i| i.0.as_str() == APOLLO_TRAFFIC_SHAPING) - .and_then(|plugin| plugin.1.as_any().downcast_ref::()) - { - Some(shaping) => Either::A(shaping.supergraph_service_internal(supergraph_service)), - None => Either::B(supergraph_service), - }; - - ServiceBuilder::new() - .layer(EnsureQueryPresence::default()) - .service( - self.plugins - .iter() - .rev() - .fold(BoxService::new(supergraph_service), |acc, (_, e)| { - e.supergraph_service(acc) - }), - ) - } - - /// Create a test service. - #[cfg(test)] - pub(crate) fn test_service( - &self, - ) -> tower::util::BoxCloneService { - use tower::buffer::Buffer; - - Buffer::new(self.make(), 512).boxed_clone() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/apollo-router/src/services/transport.rs b/apollo-router/src/services/transport.rs index 23292a6359..66d0ab6d1d 100644 --- a/apollo-router/src/services/transport.rs +++ b/apollo-router/src/services/transport.rs @@ -1,9 +1,30 @@ #![allow(missing_docs)] // FIXME +use bytes::Bytes; +use futures::stream::StreamExt; +use static_assertions::assert_impl_all; use tower::BoxError; -pub type Request = http::Request; -pub type Response = http::Response; +use crate::Context; + pub type BoxService = tower::util::BoxService; pub type BoxCloneService = tower::util::BoxCloneService; pub type ServiceResult = Result; +pub type Response = http::Response; +#[non_exhaustive] +pub struct Request { + /// Original request to the Router. + pub http_request: http::Request, + + /// Context for extension + pub context: Context, +} + +impl From> for Request { + fn from(http_request: http::Request) -> Self { + Self { + http_request, + context: Context::new(), + } + } +} diff --git a/apollo-router/src/services/transport_service.rs b/apollo-router/src/services/transport_service.rs new file mode 100644 index 0000000000..a7fdec1070 --- /dev/null +++ b/apollo-router/src/services/transport_service.rs @@ -0,0 +1,543 @@ +//! Implements the router phase of the request lifecycle. + +use std::sync::Arc; +use std::task::Poll; + +use futures::future::BoxFuture; +use futures::stream::StreamExt; +use futures::TryFutureExt; +use http::StatusCode; +use indexmap::IndexMap; +use multimap::MultiMap; +use opentelemetry::trace::SpanKind; +use tower::util::BoxService; +use tower::util::Either; +use tower::BoxError; +use tower::ServiceBuilder; +use tower::ServiceExt; +use tower_service::Service; +use tracing_futures::Instrument; + +use super::new_service::NewService; +use super::subgraph_service::MakeSubgraphService; +use super::subgraph_service::SubgraphCreator; +use super::ExecutionCreator; +use super::ExecutionServiceFactory; +use super::QueryPlannerContent; +use crate::axum_factory::utils::accepts_multipart; +use crate::error::CacheResolverError; +use crate::error::ServiceBuildError; +use crate::graphql; +use crate::graphql::IntoGraphQLErrors; +use crate::introspection::Introspection; +use crate::plugin::DynPlugin; +use crate::plugins::traffic_shaping::TrafficShaping; +use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING; +use crate::query_planner::BridgeQueryPlanner; +use crate::query_planner::CachingQueryPlanner; +use crate::router_factory::Endpoint; +use crate::router_factory::TransportServiceFactory; +use crate::services::layers::ensure_query_presence::EnsureQueryPresence; +use crate::Configuration; +use crate::Context; +use crate::ExecutionRequest; +use crate::ExecutionResponse; +use crate::ListenAddr; +use crate::QueryPlannerRequest; +use crate::QueryPlannerResponse; +use crate::Schema; +use crate::TransportRequest; +use crate::TransportResponse; + +/// An [`IndexMap`] of available plugins. +pub(crate) type Plugins = IndexMap>; + +/// Containing [`Service`] in the request lifecyle. +#[derive(Clone)] +pub(crate) struct TransportService { + supergraph_service: SupergraphService, + // apq_layer: APQ, +} + +#[buildstructor::buildstructor] +impl TransportService { + #[builder] + pub(crate) fn new( + supergraph_service: SupergraphService, + // apq_layer: APQ, + ) -> Self { + Self { + supergraph_service, + // apq, + } + } +} + +impl Service for TransportService +where + ExecutionFactory: ExecutionServiceFactory, +{ + type Response = TransportResponse; + type Error = BoxError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.supergraph_service + .poll_ready(cx) + .map_err(|err| err.into()) + } + + fn call(&mut self, req: TransportRequest) -> Self::Future { + // Consume our cloned services and allow ownership to be transferred to the async block. + let clone = self.supergraph_service.clone(); + let supergraph_service = std::mem::replace(&mut self.supergraph_service, clone); + supergraph_service.call(req) + } +} + +/// Builder which generates a plugin pipeline. +/// +/// This is at the heart of the delegation of responsibility model for the router. A schema, +/// collection of plugins, collection of subgraph services are assembled to generate a +/// [`tower::util::BoxCloneService`] capable of processing a router request +/// through the entire stack to return a response. +pub(crate) struct PluggableTransportServiceBuilder { + schema: Arc, + plugins: Plugins, + subgraph_services: Vec<(String, Arc)>, + configuration: Option>, +} + +impl PluggableTransportServiceBuilder { + pub(crate) fn new(schema: Arc) -> Self { + Self { + schema, + plugins: Default::default(), + subgraph_services: Default::default(), + configuration: None, + } + } + + pub(crate) fn with_dyn_plugin( + mut self, + plugin_name: String, + plugin: Box, + ) -> PluggableTransportServiceBuilder { + self.plugins.insert(plugin_name, plugin); + self + } + + pub(crate) fn with_subgraph_service( + mut self, + name: &str, + service_maker: S, + ) -> PluggableTransportServiceBuilder + where + S: MakeSubgraphService, + { + self.subgraph_services + .push((name.to_string(), Arc::new(service_maker))); + self + } + + pub(crate) fn with_configuration( + mut self, + configuration: Arc, + ) -> PluggableTransportServiceBuilder { + self.configuration = Some(configuration); + self + } + + pub(crate) async fn build(self) -> Result { + // Note: The plugins are always applied in reverse, so that the + // fold is applied in the correct sequence. We could reverse + // the list of plugins, but we want them back in the original + // order at the end of this function. Instead, we reverse the + // various iterators that we create for folding and leave + // the plugins in their original order. + + let configuration = self.configuration.unwrap_or_default(); + + let plan_cache_limit = std::env::var("ROUTER_PLAN_CACHE_LIMIT") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(100); + let redis_urls = configuration.Transport.cache(); + + let introspection = if configuration.Transport.introspection { + Some(Arc::new(Introspection::new(&configuration).await)) + } else { + None + }; + + // QueryPlannerService takes an UnplannedRequest and outputs PlannedRequest + let bridge_query_planner = + BridgeQueryPlanner::new(self.schema.clone(), introspection, configuration) + .await + .map_err(ServiceBuildError::QueryPlannerError)?; + let query_planner_service = CachingQueryPlanner::new( + bridge_query_planner, + plan_cache_limit, + self.schema.schema_id.clone(), + redis_urls, + ) + .await; + + let plugins = Arc::new(self.plugins); + + let subgraph_creator = Arc::new(SubgraphCreator::new( + self.subgraph_services, + plugins.clone(), + )); + + Ok(RouterCreator { + query_planner_service, + subgraph_creator, + schema: self.schema, + plugins, + }) + } +} + +/// A collection of services and data which may be used to create a "router". +#[derive(Clone)] +pub(crate) struct RouterCreator { + query_planner_service: CachingQueryPlanner, + subgraph_creator: Arc, + schema: Arc, + plugins: Arc, +} + +impl NewService for RouterCreator { + type Service = BoxService; + fn new_service(&self) -> Self::Service { + self.make().boxed() + } +} + +impl TransportServiceFactory for RouterCreator { + type TransportService = BoxService; + + type Future = <>::Service as Service< + TransportRequest, + >>::Future; + + fn web_endpoints(&self) -> MultiMap { + let mut mm = MultiMap::new(); + self.plugins + .values() + .for_each(|p| mm.extend(p.web_endpoints())); + mm + } +} + +impl RouterCreator { + pub(crate) fn make( + &self, + ) -> impl Service< + TransportRequest, + Response = TransportResponse, + Error = BoxError, + Future = BoxFuture<'static, Result>, + > + Send { + let Transport_service = TransportService::builder() + .query_planner_service(self.query_planner_service.clone()) + .execution_service_factory(ExecutionCreator { + schema: self.schema.clone(), + plugins: self.plugins.clone(), + subgraph_creator: self.subgraph_creator.clone(), + }) + .schema(self.schema.clone()) + .build(); + + let Transport_service = match self + .plugins + .iter() + .find(|i| i.0.as_str() == APOLLO_TRAFFIC_SHAPING) + .and_then(|plugin| plugin.1.as_any().downcast_ref::()) + { + Some(shaping) => Either::A(shaping.Transport_service_internal(Transport_service)), + None => Either::B(Transport_service), + }; + + ServiceBuilder::new() + .layer(EnsureQueryPresence::default()) + .service( + self.plugins + .iter() + .rev() + .fold(BoxService::new(Transport_service), |acc, (_, e)| { + e.Transport_service(acc) + }), + ) + } + + /// Create a test service. + #[cfg(test)] + pub(crate) fn test_service( + &self, + ) -> tower::util::BoxCloneService { + use tower::buffer::Buffer; + + Buffer::new(self.make(), 512).boxed_clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::plugin::test::MockSubgraph; + use crate::services::Transport; + use crate::test_harness::MockedSubgraphs; + use crate::TestHarness; + + const SCHEMA: &str = r#"schema + @core(feature: "https://specs.apollo.dev/core/v0.1") + @core(feature: "https://specs.apollo.dev/join/v0.1") + @core(feature: "https://specs.apollo.dev/inaccessible/v0.1") + { + query: Query + } + directive @core(feature: String!) repeatable on SCHEMA + directive @join__field(graph: join__Graph, requires: join__FieldSet, provides: join__FieldSet) on FIELD_DEFINITION + directive @join__type(graph: join__Graph!, key: join__FieldSet) repeatable on OBJECT | INTERFACE + directive @join__owner(graph: join__Graph!) on OBJECT | INTERFACE + directive @join__graph(name: String!, url: String!) on ENUM_VALUE + directive @inaccessible on OBJECT | FIELD_DEFINITION | INTERFACE | UNION + scalar join__FieldSet + + enum join__Graph { + USER @join__graph(name: "user", url: "http://localhost:4001/graphql") + ORGA @join__graph(name: "orga", url: "http://localhost:4002/graphql") + } + + type Query { + currentUser: User @join__field(graph: USER) + } + + type User + @join__owner(graph: USER) + @join__type(graph: ORGA, key: "id") + @join__type(graph: USER, key: "id"){ + id: ID! + name: String + activeOrganization: Organization + } + + type Organization + @join__owner(graph: ORGA) + @join__type(graph: ORGA, key: "id") + @join__type(graph: USER, key: "id") { + id: ID + creatorUser: User + name: String + nonNullId: ID! + suborga: [Organization] + }"#; + + #[tokio::test] + async fn nullability_formatting() { + let subgraphs = MockedSubgraphs([ + ("user", MockSubgraph::builder().with_json( + serde_json::json!{{"query":"{currentUser{activeOrganization{__typename id}}}"}}, + serde_json::json!{{"data": {"currentUser": { "activeOrganization": null }}}} + ).build()), + ("orga", MockSubgraph::default()) + ].into_iter().collect()); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({"include_subgraph_errors": { "all": true } })) + .unwrap() + .schema(SCHEMA) + .extra_plugin(subgraphs) + .build() + .await + .unwrap(); + + let request = Transport::Request::fake_builder() + .query("query { currentUser { activeOrganization { id creatorUser { name } } } }") + // Request building here + .build() + .unwrap(); + let response = service + .oneshot(request) + .await + .unwrap() + .next_response() + .await + .unwrap(); + + insta::assert_json_snapshot!(response); + } + + #[tokio::test] + async fn nullability_bubbling() { + let subgraphs = MockedSubgraphs([ + ("user", MockSubgraph::builder().with_json( + serde_json::json!{{"query":"{currentUser{activeOrganization{__typename id}}}"}}, + serde_json::json!{{"data": {"currentUser": { "activeOrganization": {} }}}} + ).build()), + ("orga", MockSubgraph::default()) + ].into_iter().collect()); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({"include_subgraph_errors": { "all": true } })) + .unwrap() + .schema(SCHEMA) + .extra_plugin(subgraphs) + .build() + .await + .unwrap(); + + let request = Transport::Request::fake_builder() + .query( + "query { currentUser { activeOrganization { nonNullId creatorUser { name } } } }", + ) + .build() + .unwrap(); + let response = service + .oneshot(request) + .await + .unwrap() + .next_response() + .await + .unwrap(); + + insta::assert_json_snapshot!(response); + } + + #[tokio::test] + async fn errors_on_deferred_responses() { + let subgraphs = MockedSubgraphs([ + ("user", MockSubgraph::builder().with_json( + serde_json::json!{{"query":"{currentUser{__typename id}}"}}, + serde_json::json!{{"data": {"currentUser": { "__typename": "User", "id": "0" }}}} + ) + .with_json( + serde_json::json!{{ + "query":"query($representations:[_Any!]!){_entities(representations:$representations){...on User{name}}}", + "variables": { + "representations":[{"__typename": "User", "id":"0"}] + } + }}, + serde_json::json!{{ + "data": { + "_entities": [{ "suborga": [ + { "__typename": "User", "name": "AAA"}, + ] }] + }, + "errors": [ + { + "message": "error user 0", + "path": ["_entities", 0], + } + ] + }} + ).build()), + ("orga", MockSubgraph::default()) + ].into_iter().collect()); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({"include_subgraph_errors": { "all": true } })) + .unwrap() + .schema(SCHEMA) + .extra_plugin(subgraphs) + .build() + .await + .unwrap(); + + let request = Transport::Request::fake_builder() + .header("Accept", "multipart/mixed; deferSpec=20220824") + .query("query { currentUser { id ...@defer { name } } }") + .build() + .unwrap(); + + let mut stream = service.oneshot(request).await.unwrap(); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + } + + #[tokio::test] + async fn errors_on_incremental_responses() { + let subgraphs = MockedSubgraphs([ + ("user", MockSubgraph::builder().with_json( + serde_json::json!{{"query":"{currentUser{activeOrganization{__typename id}}}"}}, + serde_json::json!{{"data": {"currentUser": { "activeOrganization": { "__typename": "Organization", "id": "0" } }}}} + ).build()), + ("orga", MockSubgraph::builder().with_json( + serde_json::json!{{ + "query":"query($representations:[_Any!]!){_entities(representations:$representations){...on Organization{suborga{__typename id}}}}", + "variables": { + "representations":[{"__typename": "Organization", "id":"0"}] + } + }}, + serde_json::json!{{ + "data": { + "_entities": [{ "suborga": [ + { "__typename": "Organization", "id": "1"}, + { "__typename": "Organization", "id": "2"}, + { "__typename": "Organization", "id": "3"}, + ] }] + }, + }} + ) + .with_json( + serde_json::json!{{ + "query":"query($representations:[_Any!]!){_entities(representations:$representations){...on Organization{name}}}", + "variables": { + "representations":[ + {"__typename": "Organization", "id":"1"}, + {"__typename": "Organization", "id":"2"}, + {"__typename": "Organization", "id":"3"} + + ] + } + }}, + serde_json::json!{{ + "data": { + "_entities": [ + { "__typename": "Organization", "id": "1"}, + { "__typename": "Organization", "id": "2", "name": "A"}, + { "__typename": "Organization", "id": "3"}, + ] + }, + "errors": [ + { + "message": "error orga 1", + "path": ["_entities", 0], + }, + { + "message": "error orga 3", + "path": ["_entities", 2], + } + ] + }} + ).build()) + ].into_iter().collect()); + + let service = TestHarness::builder() + .configuration_json(serde_json::json!({"include_subgraph_errors": { "all": true } })) + .unwrap() + .schema(SCHEMA) + .extra_plugin(subgraphs) + .build() + .await + .unwrap(); + + let request = Transport::Request::fake_builder() + .header("Accept", "multipart/mixed; deferSpec=20220824") + .query( + "query { currentUser { activeOrganization { id suborga { id ...@defer { name } } } } }", + ) + .build() + .unwrap(); + + let mut stream = service.oneshot(request).await.unwrap(); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + + insta::assert_json_snapshot!(stream.next_response().await.unwrap()); + } +} diff --git a/apollo-router/src/state_machine.rs b/apollo-router/src/state_machine.rs index 80abdfb5f3..055dfb048d 100644 --- a/apollo-router/src/state_machine.rs +++ b/apollo-router/src/state_machine.rs @@ -23,8 +23,8 @@ use super::state_machine::State::Startup; use super::state_machine::State::Stopped; use crate::configuration::Configuration; use crate::configuration::ListenAddr; -use crate::router_factory::SupergraphServiceConfigurator; -use crate::router_factory::SupergraphServiceFactory; +use crate::router_factory::TransportServiceConfigurator; +use crate::router_factory::TransportServiceFactory; use crate::Schema; /// This state maintains private information that is not exposed to the user via state listener. @@ -67,7 +67,7 @@ impl Display for State { pub(crate) struct StateMachine where S: HttpServerFactory, - FA: SupergraphServiceConfigurator, + FA: TransportServiceConfigurator, { http_server_factory: S, router_configurator: FA, @@ -82,8 +82,8 @@ where impl StateMachine where S: HttpServerFactory, - FA: SupergraphServiceConfigurator + Send, - FA::SupergraphServiceFactory: SupergraphServiceFactory, + FA: TransportServiceConfigurator + Send, + FA::TransportServiceFactory: TransportServiceFactory, { pub(crate) fn new(http_server_factory: S, router_factory: FA) -> Self { let graphql_ready = Arc::new(RwLock::new(None)); @@ -262,7 +262,7 @@ where async fn maybe_update_listen_addresses( &mut self, - state: &mut State<::SupergraphServiceFactory>, + state: &mut State<::TransportServiceFactory>, ) { let (graphql_listen_address, extra_listen_addresses) = if let Running { server_handle, .. } = &state { @@ -288,10 +288,10 @@ where async fn maybe_transition_to_running( &mut self, - state: State<::SupergraphServiceFactory>, + state: State<::TransportServiceFactory>, ) -> Result< - State<::SupergraphServiceFactory>, - State<::SupergraphServiceFactory>, + State<::TransportServiceFactory>, + State<::TransportServiceFactory>, > { if let Startup { configuration: Some(configuration), @@ -354,13 +354,13 @@ where &mut self, configuration: Arc, schema: Arc, - router_service: ::SupergraphServiceFactory, + router_service: ::TransportServiceFactory, server_handle: HttpServerHandle, new_configuration: Option>, new_schema: Option>, ) -> Result< - State<::SupergraphServiceFactory>, - State<::SupergraphServiceFactory>, + State<::TransportServiceFactory>, + State<::TransportServiceFactory>, > { let new_schema = new_schema.unwrap_or_else(|| schema.clone()); let new_configuration = new_configuration.unwrap_or_else(|| configuration.clone()); @@ -449,8 +449,8 @@ mod tests { use crate::http_server_factory::Listener; use crate::plugin::DynPlugin; use crate::router_factory::Endpoint; - use crate::router_factory::SupergraphServiceConfigurator; - use crate::router_factory::SupergraphServiceFactory; + use crate::router_factory::TransportServiceConfigurator; + use crate::router_factory::TransportServiceFactory; use crate::services::new_service::NewService; use crate::services::SupergraphRequest; use crate::services::SupergraphResponse; @@ -654,8 +654,8 @@ mod tests { MyRouterConfigurator {} #[async_trait::async_trait] - impl SupergraphServiceConfigurator for MyRouterConfigurator { - type SupergraphServiceFactory = MockMyRouterFactory; + impl TransportServiceConfigurator for MyRouterConfigurator { + type TransportServiceFactory = MockMyRouterFactory; async fn create<'a>( &'a mut self, @@ -671,7 +671,7 @@ mod tests { #[derive(Debug)] MyRouterFactory {} - impl SupergraphServiceFactory for MyRouterFactory { + impl TransportServiceFactory for MyRouterFactory { type SupergraphService = MockMyRouter; type Future = >::Future; fn web_endpoints(&self) -> MultiMap; @@ -734,7 +734,7 @@ mod tests { _web_endpoints: MultiMap, ) -> Self::Future where - RF: SupergraphServiceFactory, + RF: TransportServiceFactory, { let res = self.create_server(configuration, main_listener); Box::pin(async move { res }) diff --git a/apollo-router/src/test_harness.rs b/apollo-router/src/test_harness.rs index 93de732c4b..bbdbb4ae55 100644 --- a/apollo-router/src/test_harness.rs +++ b/apollo-router/src/test_harness.rs @@ -11,8 +11,8 @@ use crate::plugin::test::MockSubgraph; use crate::plugin::DynPlugin; use crate::plugin::Plugin; use crate::plugin::PluginInit; -use crate::router_factory::SupergraphServiceConfigurator; -use crate::router_factory::YamlSupergraphServiceFactory; +use crate::router_factory::TransportServiceConfigurator; +use crate::router_factory::YamlTransportServiceFactory; use crate::services::execution; use crate::services::layers::apq::APQLayer; use crate::services::subgraph; @@ -201,7 +201,7 @@ impl<'a> TestHarness<'a> { let canned_schema = include_str!("../testing_schema.graphql"); let schema = builder.schema.unwrap_or(canned_schema); let schema = Arc::new(Schema::parse(schema, &config)?); - let router_creator = YamlSupergraphServiceFactory + let router_creator = YamlTransportServiceFactory .create(config.clone(), schema, None, Some(builder.extra_plugins)) .await?; @@ -226,7 +226,7 @@ impl<'a> TestHarness<'a> { pub(crate) async fn build_http_service(self) -> Result { use crate::axum_factory::make_axum_router; use crate::axum_factory::ListenAddrAndRouter; - use crate::router_factory::SupergraphServiceFactory; + use crate::router_factory::TransportServiceFactory; let (config, router_creator) = self.build_common().await?; let web_endpoints = router_creator.web_endpoints();