diff --git a/.changesets/feat_caroline_low_priority_warmup.md b/.changesets/feat_caroline_low_priority_warmup.md new file mode 100644 index 0000000000..5bd6f3b858 --- /dev/null +++ b/.changesets/feat_caroline_low_priority_warmup.md @@ -0,0 +1,18 @@ +### De-prioritize warm-up process query parsing and planning ([PR #7223](https://github.com/apollographql/router/pull/7223)) + +The router warms up its query planning cache after a schema or configuration change. This change decreases the priority +of warm up tasks in the compute job queue, to reduce the impact of warmup on serving requests. + +This change adds new values to the `job.type` dimension of the following metrics: +- `apollo.router.compute_jobs.duration` - A histogram of time spent in the compute pipeline by the job, including the queue and query planning. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) + - `job.outcome`: (`executed_ok`, `executed_error`, `channel_error`, `rejected_queue_full`, `abandoned`) +- `apollo.router.compute_jobs.queue.wait.duration` - A histogram of time spent in the compute queue by the job. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) +- `apollo.router.compute_jobs.execution.duration` - A histogram of time spent to execute job (excludes time spent in the queue). + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) +- `apollo.router.compute_jobs.active_jobs` - A gauge of the number of compute jobs being processed in parallel. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) + + +By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7223 diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 7d01cf2449..11f9c52aa1 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -154,6 +154,11 @@ where pub(crate) fn activate(&self) { self.storage.activate() } + + #[cfg(test)] + pub(crate) async fn len(&self) -> usize { + self.storage.len().await + } } pub(crate) struct Entry { diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index 6001d25413..ac77578d18 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -117,14 +117,18 @@ pub(crate) enum ComputeJobType { QueryParsing, QueryPlanning, Introspection, + QueryParsingWarmup, + QueryPlanningWarmup, } impl From for Priority { fn from(job_type: ComputeJobType) -> Self { match job_type { - ComputeJobType::QueryPlanning => Self::P8, // high - ComputeJobType::QueryParsing => Self::P4, // medium - ComputeJobType::Introspection => Self::P1, // low + ComputeJobType::QueryPlanning => Self::P8, // high + ComputeJobType::QueryParsing => Self::P4, // medium + ComputeJobType::Introspection => Self::P3, // low + ComputeJobType::QueryParsingWarmup => Self::P1, // low + ComputeJobType::QueryPlanningWarmup => Self::P2, // low } } } @@ -318,14 +322,23 @@ mod tests { use super::*; use crate::assert_snapshot_subscriber; - #[tokio::test] - async fn test_observability() { - // make sure that the queue has been initialized by calling `execute`. if this - // step is skipped, the queue will _sometimes_ be initialized in the step below, - // which causes an additional log line and a snapshot mismatch. + /// Send a request to the compute queue to make sure it is initialized. + /// + /// The queue is (a) wrapped in a `OnceLock`, so it is shared between tests, and (b) only + /// initialized after receiving and processing a request. + /// These two properties can lead to inconsistent behavior. + async fn ensure_queue_is_initialized() { execute(ComputeJobType::Introspection, |_| {}) .unwrap() .await; + } + + #[tokio::test] + async fn test_observability() { + // make sure that the queue has been initialized - if this step is skipped, the + // queue will _sometimes_ be initialized in the step below, which causes an + // additional log line and a snapshot mismatch. + ensure_queue_is_initialized().await; async { let span = info_span!("test_observability"); @@ -399,4 +412,47 @@ mod tests { e => panic!("job did not cancel as expected: {e:?}"), }; } + + #[tokio::test] + async fn test_relative_priorities() { + let pool_size = thread_pool_size(); + ensure_queue_is_initialized().await; + + let start = Instant::now(); + + // Send in `pool_size * 2 - 1` low priority requests and 1 high priority request after the + // low priority requests. + // If the queue were isolated, we'd expect `pool_size` low priority requests to complete + // before the high priority requests, since the workers would start on the low priority + // requests immediately. + // But, the queue is not isolated. This loosens our guarantees - we expect _up to_ `pool_size` + // low priority requests to complete before the high priority request. + let low_priority_handles: Vec<_> = (0..pool_size * 2 - 1) + .map(|_| { + execute(ComputeJobType::QueryPlanningWarmup, move |_| { + let inner_start = start; + std::thread::sleep(Duration::from_millis(10)); + inner_start.elapsed() + }) + .unwrap() + }) + .collect(); + let high_priority_handle = execute(ComputeJobType::QueryPlanning, move |_| { + let inner_start = start; + std::thread::sleep(Duration::from_millis(5)); + inner_start.elapsed() + }) + .unwrap(); + + let low_priority_durations = + futures::future::join_all(low_priority_handles.into_iter()).await; + let high_priority_duration = high_priority_handle.await; + + let low_before_high_count = low_priority_durations + .iter() + .filter(|d| d < &&high_priority_duration) + .count(); + + assert!(low_before_high_count <= pool_size); + } } diff --git a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs index 1def4a8722..1fcf353355 100644 --- a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs +++ b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs @@ -644,6 +644,7 @@ mod tests { use crate::Configuration; use crate::Context; use crate::assert_snapshot_subscriber; + use crate::compute_job::ComputeJobType; use crate::plugins::authorization::CacheKeyMetadata; use crate::query_planner::QueryPlannerService; use crate::services::QueryPlannerContent; @@ -745,6 +746,7 @@ mod tests { query, CacheKeyMetadata::default(), PlanOptions::default(), + ComputeJobType::QueryPlanning, )) .await .unwrap(); diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index c8ad7c392b..26649a6ab6 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -24,6 +24,7 @@ use crate::cache::estimate_size; use crate::cache::storage::InMemoryCache; use crate::cache::storage::ValueType; use crate::compute_job::ComputeBackPressureError; +use crate::compute_job::ComputeJobType; use crate::compute_job::MaybeBackPressureError; use crate::configuration::PersistedQueriesPrewarmQueryPlanCache; use crate::error::CacheResolverError; @@ -273,12 +274,25 @@ where config_mode_hash: _, } in all_cache_keys { - let doc = match query_analysis - .parse_document(&query, operation_name.as_deref()) - .await - { - Ok(doc) => doc, - Err(_) => continue, + // NB: warmup tasks have a low priority so that real requests are prioritized + let doc = loop { + match query_analysis + .parse_document( + &query, + operation_name.as_deref(), + ComputeJobType::QueryParsingWarmup, + ) + .await + { + Ok(doc) => break doc, + Err(MaybeBackPressureError::PermanentError(_)) => { + continue 'all_cache_keys_loop; + } + Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // try again + } + } }; let caching_key = CachingQueryKey { @@ -316,26 +330,6 @@ where }) .await; if entry.is_first() { - let doc = loop { - match query_analysis - .parse_document(&query, operation_name.as_deref()) - .await - { - Ok(doc) => break doc, - Err(MaybeBackPressureError::PermanentError(error)) => { - let e = Arc::new(QueryPlannerError::SpecError(error)); - tokio::spawn(async move { - entry.insert(Err(e)).await; - }); - continue 'all_cache_keys_loop; - } - Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - // try again - } - } - }; - loop { let request = QueryPlannerRequest { query: query.clone(), @@ -343,6 +337,7 @@ where document: doc.clone(), metadata: caching_key.metadata.clone(), plan_options: caching_key.plan_options.clone(), + compute_job_type: ComputeJobType::QueryPlanningWarmup, }; let res = match service.ready().await { Ok(service) => service.call(request).await, @@ -510,6 +505,7 @@ where .document(doc) .metadata(caching_key.metadata) .plan_options(caching_key.plan_options) + .compute_job_type(ComputeJobType::QueryPlanning) .build(); // some clients might timeout and cancel the request before query planning is finished, @@ -724,6 +720,8 @@ impl ValueType for Result> { #[cfg(test)] mod tests { + use std::time::Duration; + use mockall::mock; use serde_json_bytes::json; use test_log::test; @@ -1177,4 +1175,85 @@ mod tests { panic!("Expected both calls to return same error"); } } + + #[tokio::test] + async fn test_cache_warmup() { + let create_delegate = |call_count| { + let mut delegate = MockMyQueryPlanner::new(); + delegate.expect_clone().times(1).returning(move || { + let mut planner = MockMyQueryPlanner::new(); + planner.expect_sync_call().times(call_count).returning(|_| { + let plan = Arc::new(QueryPlan::fake_new(None, None)); + Ok(QueryPlannerResponse::builder() + .content(QueryPlannerContent::Plan { plan }) + .build()) + }); + planner + }); + delegate + }; + + let configuration: Configuration = Default::default(); + let schema = Arc::new( + Schema::parse( + include_str!("../testdata/starstuff@current.graphql"), + &configuration, + ) + .unwrap(), + ); + + let create_planner = async |delegate| { + CachingQueryPlanner::new( + delegate, + schema.clone(), + Default::default(), + &configuration, + IndexMap::default(), + ) + .await + .unwrap() + }; + + let create_request = || { + let query_str = "query ExampleQuery { me { name } }".to_string(); + let doc = Query::parse_document(&query_str, None, &schema, &configuration).unwrap(); + let context = Context::new(); + context + .extensions() + .with_lock(|lock| lock.insert::(doc)); + query_planner::CachingRequest::new(query_str, None, context) + }; + + // send query to caching planner. it should save this query plan in its cache + let mut planner = create_planner(create_delegate(1)).await; + let response = planner.call(create_request()).await.unwrap(); + assert!(response.content.is_some()); + assert_eq!(planner.cache.len().await, 1); + + // create and warm up a new planner. new planner's delegate should be called once during + // the warm-up phase to populate the cache + let query_analysis_layer = + QueryAnalysisLayer::new(schema.clone(), Arc::new(configuration.clone())).await; + let mut new_planner = create_planner(create_delegate(1)).await; + new_planner + .warm_up( + &query_analysis_layer, + &Arc::new(PersistedQueryLayer::new(&configuration).await.unwrap()), + Some(planner.previous_cache()), + Some(1), + Default::default(), + &Default::default(), + ) + .await; + // wait a beat - items are added to cache asynchronously, so this helps avoid flakiness + tokio::time::sleep(Duration::from_millis(10)).await; + assert_eq!(new_planner.cache.len().await, 1); + + // create a new delegate that _shouldn't_ be called since the new planner already has the + // result in its cache + new_planner.delegate = create_delegate(0); + let response = new_planner.call(create_request()).await.unwrap(); + assert!(response.content.is_some()); + assert_eq!(new_planner.cache.len().await, 1); + } } diff --git a/apollo-router/src/query_planner/query_planner_service.rs b/apollo-router/src/query_planner/query_planner_service.rs index cce8099d15..4af76f5499 100644 --- a/apollo-router/src/query_planner/query_planner_service.rs +++ b/apollo-router/src/query_planner/query_planner_service.rs @@ -145,6 +145,7 @@ impl QueryPlannerService { doc: &ParsedDocument, operation: Option, plan_options: PlanOptions, + compute_job_type: ComputeJobType, // Initialization code that needs mutable access to the plan, // before we potentially share it in Arc with a background thread // for "both" mode. @@ -189,7 +190,7 @@ impl QueryPlannerService { let root_node = convert_root_query_plan_node(&plan); Ok((plan, root_node)) }; - let (plan, mut root_node) = compute_job::execute(ComputeJobType::QueryPlanning, job) + let (plan, mut root_node) = compute_job::execute(compute_job_type, job) .map_err(MaybeBackPressureError::TemporaryError)? .await?; if let Some(node) = &mut root_node { @@ -302,14 +303,21 @@ impl QueryPlannerService { selections: Query, plan_options: PlanOptions, doc: &ParsedDocument, + compute_job_type: ComputeJobType, query_metrics: OperationLimits, ) -> Result> { let plan_result = self - .plan_inner(doc, operation.clone(), plan_options, |root_node| { - root_node.init_parsed_operations_and_hash_subqueries(&self.subgraph_schemas)?; - root_node.extract_authorization_metadata(self.schema.supergraph_schema(), &key); - Ok(()) - }) + .plan_inner( + doc, + operation.clone(), + plan_options, + compute_job_type, + |root_node| { + root_node.init_parsed_operations_and_hash_subqueries(&self.subgraph_schemas)?; + root_node.extract_authorization_metadata(self.schema.supergraph_schema(), &key); + Ok(()) + }, + ) .await?; let QueryPlanResult { query_plan_root_node, @@ -388,6 +396,7 @@ impl Service for QueryPlannerService { document, metadata, plan_options, + compute_job_type, } = req; let this = self.clone(); @@ -433,6 +442,7 @@ impl Service for QueryPlannerService { plan_options, }, doc, + compute_job_type, ) .await; @@ -463,6 +473,7 @@ impl QueryPlannerService { &self, mut key: QueryKey, mut doc: ParsedDocument, + compute_job_type: ComputeJobType, ) -> Result> { let mut query_metrics = Default::default(); let mut selections = self @@ -569,6 +580,7 @@ impl QueryPlannerService { selections, key.plan_options, &doc, + compute_job_type, query_metrics, ) .await @@ -719,6 +731,7 @@ mod tests { selections, PlanOptions::default(), &doc, + ComputeJobType::QueryPlanning, query_metrics ) .await @@ -1071,6 +1084,7 @@ mod tests { plan_options: PlanOptions::default(), }, doc, + ComputeJobType::QueryPlanning, ) .await .unwrap(); @@ -1127,6 +1141,7 @@ mod tests { plan_options, }, doc, + ComputeJobType::QueryPlanning, ) .await; match result { diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index 7761587f10..06075927ee 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -116,6 +116,7 @@ impl QueryAnalysisLayer { &self, query: &str, operation_name: Option<&str>, + compute_job_type: ComputeJobType, ) -> Result> { let query = query.to_string(); let operation_name = operation_name.map(|o| o.to_string()); @@ -125,7 +126,7 @@ impl QueryAnalysisLayer { // Must be created *outside* of the compute_job or the span is not connected to the parent let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL"); let compute_job_future = span.in_scope(||{ - compute_job::execute(ComputeJobType::QueryParsing, move |_| { + compute_job::execute(compute_job_type, move |_| { Query::parse_document( &query, operation_name.as_deref(), @@ -283,7 +284,10 @@ impl QueryAnalysisLayer { .cloned(); let res = match entry { - None => match self.parse_document(&query, op_name.as_deref()).await { + None => match self + .parse_document(&query, op_name.as_deref(), ComputeJobType::QueryParsing) + .await + { Err(e) => { if let MaybeBackPressureError::PermanentError(errors) = &e { (*self.cache.lock().await).put( diff --git a/apollo-router/src/services/query_planner.rs b/apollo-router/src/services/query_planner.rs index 3e1e809aed..336712a732 100644 --- a/apollo-router/src/services/query_planner.rs +++ b/apollo-router/src/services/query_planner.rs @@ -10,6 +10,7 @@ use static_assertions::assert_impl_all; use super::layers::query_analysis::ParsedDocument; use crate::Context; +use crate::compute_job::ComputeJobType; use crate::compute_job::MaybeBackPressureError; use crate::error::QueryPlannerError; use crate::graphql; @@ -33,6 +34,7 @@ pub(crate) struct Request { pub(crate) document: ParsedDocument, pub(crate) metadata: crate::plugins::authorization::CacheKeyMetadata, pub(crate) plan_options: PlanOptions, + pub(crate) compute_job_type: ComputeJobType, } #[buildstructor::buildstructor] @@ -47,6 +49,7 @@ impl Request { document: ParsedDocument, metadata: crate::plugins::authorization::CacheKeyMetadata, plan_options: PlanOptions, + compute_job_type: ComputeJobType, ) -> Request { Self { query, @@ -54,6 +57,7 @@ impl Request { document, metadata, plan_options, + compute_job_type, } } }