diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 196565f4c0..e1868776e6 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -62,7 +62,18 @@ It is now possible to create a subscriber and pass it explicitely to the telemet when creating it. It will then be modified to integrate the telemetry plugin's layer. By [@geal](https://github.com/geal) in https://github.com/apollographql/router/pull/1463 ->>>>>>> be6590826afb60bf3c683315c68f9ed47594a2a3 + + +### Reorder query planner execution ([PR #1484](https://github.com/apollographql/router/pull/1484)) + +Query planning is deterministic, it only depends on the query, operation name and query planning +options. As such, we can cache the result of the entire process. + +This changes the pipeline to apply query planner plugins between the cache and the bridge planner, +so those plugins will only be called once on the same query. If changes must be done per query, +they should happen in a supergraph service. + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/1464 ## 🚀 Features diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index b81ff8ee38..52bc63bd08 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -15,6 +15,7 @@ use serde::Deserialize; use serde::Serialize; use thiserror::Error; use tokio::task::JoinError; +use tower::BoxError; use tracing::level_filters::LevelFilter; pub use crate::configuration::ConfigurationError; @@ -157,11 +158,11 @@ impl From for FetchError { #[derive(Error, Debug, Display, Clone)] pub enum CacheResolverError { /// value retrieval failed: {0} - RetrievalError(Arc), + RetrievalError(Arc), } -impl From for CacheResolverError { - fn from(err: QueryPlannerError) -> Self { +impl From for CacheResolverError { + fn from(err: BoxError) -> Self { CacheResolverError::RetrievalError(Arc::new(err)) } } diff --git a/apollo-router/src/lib.rs b/apollo-router/src/lib.rs index 4dc4db04f2..e619f077dc 100644 --- a/apollo-router/src/lib.rs +++ b/apollo-router/src/lib.rs @@ -69,7 +69,6 @@ mod router_factory; pub mod services; mod spec; mod state_machine; -mod traits; pub use configuration::*; pub use context::Context; diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index c84cf5f3d4..cd5bb0b87c 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -173,6 +173,10 @@ pub trait Plugin: Send + Sync + 'static + Sized { /// This service handles generating the query plan for each incoming request. /// Define `query_planning_service` if your customization needs to interact with query planning functionality (for example, to log query plan details). + /// + /// Query planning uses a cache that will store the result of the query planner and query planning plugins execution, so if the same query is + /// performed twice, the query planner plugins will onyl see it once. The caching key contains the query and operation name. If modifications + /// must be performed on the query, they should be done in router service plugins. fn query_planning_service( &self, service: BoxService, @@ -238,6 +242,10 @@ pub trait DynPlugin: Send + Sync + 'static { /// This service handles generating the query plan for each incoming request. /// Define `query_planning_service` if your customization needs to interact with query planning functionality (for example, to log query plan details). + /// + /// Query planning uses a cache that will store the result of the query planner and query planning plugins execution, so if the same query is + /// performed twice, the query planner plugins will onyl see it once. The caching key contains the query and operation name. If modifications + /// must be performed on the query, they should be done in router service plugins. fn query_planning_service( &self, service: BoxService, diff --git a/apollo-router/src/query_planner/bridge_query_planner.rs b/apollo-router/src/query_planner/bridge_query_planner.rs index cabd19e278..7fd9f1693b 100644 --- a/apollo-router/src/query_planner/bridge_query_planner.rs +++ b/apollo-router/src/query_planner/bridge_query_planner.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use std::sync::Arc; -use async_trait::async_trait; use futures::future::BoxFuture; use opentelemetry::trace::SpanKind; use router_bridge::planner::DeferStreamSupport; @@ -16,12 +15,11 @@ use tower::Service; use tracing::Instrument; use super::PlanNode; +use super::QueryKey; use super::QueryPlanOptions; use crate::error::QueryPlannerError; use crate::introspection::Introspection; use crate::services::QueryPlannerContent; -use crate::traits::QueryKey; -use crate::traits::QueryPlanner; use crate::*; pub(crate) static USAGE_REPORTING: &str = "apollo_telemetry::usage_reporting"; @@ -172,8 +170,7 @@ impl Service for BridgeQueryPlanner { } } -#[async_trait] -impl QueryPlanner for BridgeQueryPlanner { +impl BridgeQueryPlanner { async fn get(&self, key: QueryKey) -> Result { let selections = self.parse_selections(key.0.clone()).await?; diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index cc578ccebf..9d7199c3e6 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -1,89 +1,49 @@ use std::collections::HashMap; -use std::ops::Deref; use std::sync::Arc; use std::task; -use async_trait::async_trait; use futures::future::BoxFuture; use router_bridge::planner::UsageReporting; use serde::Serialize; use serde_json_bytes::value::Serializer; +use tower::BoxError; +use tower::ServiceExt; +use super::QueryKey; use super::USAGE_REPORTING; use crate::cache::DeduplicatingCache; use crate::error::CacheResolverError; use crate::error::QueryPlannerError; use crate::services::QueryPlannerContent; -use crate::traits::CacheResolver; -use crate::traits::QueryKey; -use crate::traits::QueryPlanner; use crate::*; -type PlanResult = Result; - /// A query planner wrapper that caches results. /// /// The query planner performs LRU caching. #[derive(Clone)] -pub(crate) struct CachingQueryPlanner { - cache: Arc>>, - delegate: Arc, -} - -/// A resolver for cache misses -struct CachingQueryPlannerResolver { +pub(crate) struct CachingQueryPlanner { + cache: Arc>>>, delegate: T, } -impl CachingQueryPlanner { +impl CachingQueryPlanner +where + T: tower::Service, +{ /// Creates a new query planner that caches the results of another [`QueryPlanner`]. pub(crate) async fn new(delegate: T, plan_cache_limit: usize) -> CachingQueryPlanner { let cache = Arc::new(DeduplicatingCache::with_capacity(plan_cache_limit).await); - Self { - cache, - delegate: Arc::new(delegate), - } - } -} - -#[async_trait] -impl CacheResolver - for CachingQueryPlannerResolver -{ - async fn retrieve(&self, key: QueryKey) -> Result { - self.delegate.get(key).await.map_err(|err| err.into()) + Self { cache, delegate } } } -#[async_trait] -impl QueryPlanner for CachingQueryPlanner { - async fn get(&self, key: QueryKey) -> PlanResult { - let entry = self.cache.get(&key).await; - if entry.is_first() { - let res = self.delegate.get(key).await; - entry.insert(res.clone()).await; - res - } else { - entry - .get() - .await - .map_err(|_| QueryPlannerError::UnhandledPlannerResult)? - } - } -} - -impl tower::Service - for CachingQueryPlanner +impl tower::Service for CachingQueryPlanner where - T: tower::Service< - QueryPlannerRequest, - Response = QueryPlannerResponse, - Error = tower::BoxError, - >, + T: tower::Service, + >::Future: Send, { type Response = QueryPlannerResponse; - // TODO I don't think we can serialize this error back to the router response's payload - type Error = tower::BoxError; + type Error = BoxError; type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll> { @@ -91,67 +51,118 @@ where } fn call(&mut self, request: QueryPlannerRequest) -> Self::Future { - let key = ( - request.query.clone(), - request.operation_name.to_owned(), - request.query_plan_options, - ); - let qp = self.clone(); + let mut qp = self.clone(); Box::pin(async move { - qp.get(key) - .await - .map(|query_planner_content| { - if let QueryPlannerContent::Plan { plan, .. } = &query_planner_content { - match (&plan.usage_reporting).serialize(Serializer) { - Ok(v) => { - request.context.insert_json_value(USAGE_REPORTING, v); - } - Err(e) => { - tracing::error!( - "usage reporting was not serializable to context, {}", - e - ); + let key = ( + request.query.clone(), + request.operation_name.to_owned(), + request.query_plan_options.clone(), + ); + let context = request.context.clone(); + let entry = qp.cache.get(&key).await; + if entry.is_first() { + let res = qp.delegate.ready().await?.call(request).await; + match res { + Ok(QueryPlannerResponse { content, context }) => { + entry.insert(Ok(content.clone())).await; + + if let QueryPlannerContent::Plan { plan, .. } = &content { + match (&plan.usage_reporting).serialize(Serializer) { + Ok(v) => { + context.insert_json_value(USAGE_REPORTING, v); + } + Err(e) => { + tracing::error!( + "usage reporting was not serializable to context, {}", + e + ); + } } } + Ok(QueryPlannerResponse { content, context }) } - query_planner_content - }) - .map_err(|e| { - let e = e.into(); - - let CacheResolverError::RetrievalError(re) = &e; - if let QueryPlannerError::PlanningErrors(pe) = re.deref() { - if let Err(inner_e) = request - .context - .insert(USAGE_REPORTING, pe.usage_reporting.clone()) - { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); - } - } else if let QueryPlannerError::SpecError(e) = re.deref() { - let error_key = match e { - SpecError::ParsingError(_) => "## GraphQLParseFailure\n", - _ => "## GraphQLValidationFailure\n", - }; - if let Err(inner_e) = request.context.insert( - USAGE_REPORTING, - UsageReporting { - stats_report_key: error_key.to_string(), - referenced_fields_by_type: HashMap::new(), - }, - ) { - tracing::error!( - "usage reporting was not serializable to context, {}", - inner_e - ); + Err(error) => { + match error.downcast_ref::() { + Some(QueryPlannerError::PlanningErrors(pe)) => { + if let Err(inner_e) = + context.insert(USAGE_REPORTING, pe.usage_reporting.clone()) + { + tracing::error!( + "usage reporting was not serializable to context, {}", + inner_e + ); + } + } + Some(QueryPlannerError::SpecError(e)) => { + let error_key = match e { + SpecError::ParsingError(_) => "## GraphQLParseFailure\n", + _ => "## GraphQLValidationFailure\n", + }; + if let Err(inner_e) = context.insert( + USAGE_REPORTING, + UsageReporting { + stats_report_key: error_key.to_string(), + referenced_fields_by_type: HashMap::new(), + }, + ) { + tracing::error!( + "usage reporting was not serializable to context, {}", + inner_e + ); + } + } + _ => {} } + + let e = Arc::new(error); + entry.insert(Err(e.clone())).await; + Err(CacheResolverError::RetrievalError(e).into()) } + } + } else { + let res = entry + .get() + .await + .map_err(|_| QueryPlannerError::UnhandledPlannerResult)?; - e.into() - }) - .map(|query_plan| QueryPlannerResponse::new(query_plan, request.context)) + match res { + Ok(content) => Ok(QueryPlannerResponse { content, context }), + Err(error) => { + if let Some(error) = error.downcast_ref::() { + if let QueryPlannerError::PlanningErrors(pe) = &error { + if let Err(inner_e) = request + .context + .insert(USAGE_REPORTING, pe.usage_reporting.clone()) + { + tracing::error!( + "usage reporting was not serializable to context, {}", + inner_e + ); + } + } else if let QueryPlannerError::SpecError(e) = &error { + let error_key = match e { + SpecError::ParsingError(_) => "## GraphQLParseFailure\n", + _ => "## GraphQLValidationFailure\n", + }; + if let Err(inner_e) = request.context.insert( + USAGE_REPORTING, + UsageReporting { + stats_report_key: error_key.to_string(), + referenced_fields_by_type: HashMap::new(), + }, + ) { + tracing::error!( + "usage reporting was not serializable to context, {}", + inner_e + ); + } + } + } + + Err(CacheResolverError::RetrievalError(error).into()) + } + } + } }) } } @@ -163,17 +174,17 @@ mod tests { use router_bridge::planner::PlanErrors; use router_bridge::planner::UsageReporting; use test_log::test; + use tower::Service; use super::*; - use crate::query_planner::QueryPlanOptions; mock! { #[derive(Debug)] MyQueryPlanner { - fn sync_get( + fn sync_call( &self, - key: QueryKey, - ) -> PlanResult; + key: QueryPlannerRequest, + ) -> Result; } impl Clone for MyQueryPlanner { @@ -181,44 +192,64 @@ mod tests { } } - #[async_trait] - impl QueryPlanner for MockMyQueryPlanner { - async fn get(&self, key: QueryKey) -> PlanResult { - self.sync_get(key) + impl Service for MockMyQueryPlanner { + type Response = QueryPlannerResponse; + + type Error = BoxError; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut task::Context<'_>, + ) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: QueryPlannerRequest) -> Self::Future { + let res = self.sync_call(req); + Box::pin(async move { res }) } } #[test(tokio::test)] async fn test_plan() { let mut delegate = MockMyQueryPlanner::new(); - delegate - .expect_sync_get() - .times(2) - .return_const(Err(QueryPlannerError::from(PlanErrors { - errors: Default::default(), - usage_reporting: UsageReporting { - stats_report_key: "this is a test key".to_string(), - referenced_fields_by_type: Default::default(), - }, - }))); - - let planner = CachingQueryPlanner::new(delegate, 10).await; + delegate.expect_clone().returning(|| { + println!("cloning query planner"); + let mut planner = MockMyQueryPlanner::new(); + planner.expect_sync_call().times(0..2).returning(|_| { + println!("calling query planner"); + + Err(QueryPlannerError::from(PlanErrors { + errors: Default::default(), + usage_reporting: UsageReporting { + stats_report_key: "this is a test key".to_string(), + referenced_fields_by_type: Default::default(), + }, + }) + .into()) + }); + planner + }); + + let mut planner = CachingQueryPlanner::new(delegate, 10).await; for _ in 0..5 { assert!(planner - .get(( + .call(QueryPlannerRequest::new( "query1".into(), Some("".into()), - QueryPlanOptions::default() + Context::new() )) .await .is_err()); } assert!(planner - .get(( + .call(QueryPlannerRequest::new( "query2".into(), Some("".into()), - QueryPlanOptions::default() + Context::new() )) .await .is_err()); diff --git a/apollo-router/src/query_planner/mod.rs b/apollo-router/src/query_planner/mod.rs index 82bc2d9415..0808f8b23d 100644 --- a/apollo-router/src/query_planner/mod.rs +++ b/apollo-router/src/query_planner/mod.rs @@ -36,6 +36,12 @@ pub(crate) struct QueryPlanOptions { pub(crate) enable_variable_deduplication: bool, } +/// A planner key. +/// +/// This type consists of a query string, an optional operation string and the +/// [`QueryPlanOptions`]. +pub(crate) type QueryKey = (String, Option, QueryPlanOptions); + /// A plan for a given GraphQL query #[derive(Debug)] pub struct QueryPlan { diff --git a/apollo-router/src/services/router_service.rs b/apollo-router/src/services/router_service.rs index ac8082a09c..9d3286a01b 100644 --- a/apollo-router/src/services/router_service.rs +++ b/apollo-router/src/services/router_service.rs @@ -33,7 +33,7 @@ use crate::graphql; use crate::graphql::Response; use crate::http_ext::Request; use crate::introspection::Introspection; -use crate::layers::ServiceBuilderExt; +use crate::layers::DEFAULT_BUFFER_SIZE; use crate::plugin::DynPlugin; use crate::plugin::Plugin; use crate::query_planner::BridgeQueryPlanner; @@ -212,9 +212,9 @@ where let status_code = match error.downcast_ref::() { Some(crate::error::CacheResolverError::RetrievalError(retrieval_error)) if matches!( - retrieval_error.deref(), - QueryPlannerError::SpecError(_) - | QueryPlannerError::SchemaValidationErrors(_) + retrieval_error.deref().downcast_ref::(), + Some(QueryPlannerError::SpecError(_)) + | Some(QueryPlannerError::SchemaValidationErrors(_)) ) => { StatusCode::BAD_REQUEST @@ -334,13 +334,20 @@ impl PluggableRouterServiceBuilder { BridgeQueryPlanner::new(self.schema.clone(), introspection, configuration) .await .map_err(ServiceBuildError::QueryPlannerError)?; - let query_planner_service = ServiceBuilder::new().buffered().service( - self.plugins.iter_mut().rev().fold( - CachingQueryPlanner::new(bridge_query_planner, plan_cache_limit) - .await - .boxed(), - |acc, (_, e)| e.query_planning_service(acc), - ), + let query_planner_service = ServiceBuilder::new().service( + CachingQueryPlanner::new( + Buffer::new( + self.plugins + .iter_mut() + .rev() + .fold(bridge_query_planner.boxed(), |acc, (_, e)| { + e.query_planning_service(acc) + }), + DEFAULT_BUFFER_SIZE, + ), + plan_cache_limit, + ) + .await, ); let plugins = Arc::new(self.plugins); @@ -365,9 +372,11 @@ impl PluggableRouterServiceBuilder { /// A collection of services and data which may be used to create a "router". #[derive(Clone)] pub struct RouterCreator { - query_planner_service: Buffer< - BoxService, - QueryPlannerRequest, + query_planner_service: CachingQueryPlanner< + Buffer< + BoxService, + QueryPlannerRequest, + >, >, subgraph_creator: Arc, schema: Arc, diff --git a/apollo-router/src/traits.rs b/apollo-router/src/traits.rs deleted file mode 100644 index 6062f9d1bf..0000000000 --- a/apollo-router/src/traits.rs +++ /dev/null @@ -1,32 +0,0 @@ -use async_trait::async_trait; - -use crate::error::CacheResolverError; -use crate::error::QueryPlannerError; -use crate::query_planner::QueryPlanOptions; -use crate::services::QueryPlannerContent; - -/// A cache resolution trait. -/// -/// Clients of CachingMap are required to provider a resolver during Map creation. The resolver -/// will be used to find values for cache misses. A Result is expected, because retrieval may fail. -#[async_trait] -pub(crate) trait CacheResolver { - async fn retrieve(&self, key: K) -> Result; -} - -/// A planner key. -/// -/// This type consists of a query string, an optional operation string and the -/// [`QueryPlanOptions`]. -pub(crate) type QueryKey = (String, Option, QueryPlanOptions); - -/// QueryPlanner can be used to plan queries. -/// -/// Implementations may cache query plans. -#[async_trait] -pub(crate) trait QueryPlanner: Send + Sync { - /// Returns a query plan given the query, operation and options. - /// Implementations may cache query plans. - #[must_use = "query plan result must be used"] - async fn get(&self, key: QueryKey) -> Result; -} diff --git a/apollo-router/tests/jaeger_test.rs b/apollo-router/tests/jaeger_test.rs index 62202c1843..be98f25c34 100644 --- a/apollo-router/tests/jaeger_test.rs +++ b/apollo-router/tests/jaeger_test.rs @@ -160,7 +160,7 @@ fn verify_spans_present(trace: &Value) -> Result<(), BoxError> { "router", "fetch", //"parse_query", Parse query will only happen once - "query_planning", + //"query_planning", query planning will only happen once "subgraph", "client_request", ]