From df7bfd6cc1078eb5fc5b18df0fd025b0c286eb2c Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 10 Apr 2025 11:00:17 -0600 Subject: [PATCH 01/13] don't re-parse document --- .../query_planner/caching_query_planner.rs | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 6ed2f13710..bb0d2dc0e7 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -316,26 +316,6 @@ where }) .await; if entry.is_first() { - let doc = loop { - match query_analysis - .parse_document(&query, operation_name.as_deref()) - .await - { - Ok(doc) => break doc, - Err(MaybeBackPressureError::PermanentError(error)) => { - let e = Arc::new(QueryPlannerError::SpecError(error)); - tokio::spawn(async move { - entry.insert(Err(e)).await; - }); - continue 'all_cache_keys_loop; - } - Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - // try again - } - } - }; - loop { let request = QueryPlannerRequest { query: query.clone(), From 3bba9a4e48c5a105f8f18f084e4aafcc91276eac Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 10 Apr 2025 11:10:36 -0600 Subject: [PATCH 02/13] de-prioritize query parsing during warmup --- apollo-router/src/compute_job/mod.rs | 8 +++++--- apollo-router/src/query_planner/caching_query_planner.rs | 7 ++++++- apollo-router/src/services/layers/query_analysis.rs | 8 ++++++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index c16ac3b07a..e0ce1ce377 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -109,14 +109,16 @@ pub(crate) enum ComputeJobType { QueryParsing, QueryPlanning, Introspection, + QueryParsingWarmup, } impl From for Priority { fn from(job_type: ComputeJobType) -> Self { match job_type { - ComputeJobType::QueryPlanning => Self::P8, // high - ComputeJobType::QueryParsing => Self::P4, // medium - ComputeJobType::Introspection => Self::P1, // low + ComputeJobType::QueryPlanning => Self::P8, // high + ComputeJobType::QueryParsing => Self::P4, // medium + ComputeJobType::Introspection => Self::P1, // low + ComputeJobType::QueryParsingWarmup => Self::P1, // low } } } diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index bb0d2dc0e7..39a6db3cf4 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -24,6 +24,7 @@ use crate::cache::estimate_size; use crate::cache::storage::InMemoryCache; use crate::cache::storage::ValueType; use crate::compute_job::ComputeBackPressureError; +use crate::compute_job::ComputeJobType; use crate::compute_job::MaybeBackPressureError; use crate::configuration::PersistedQueriesPrewarmQueryPlanCache; use crate::error::CacheResolverError; @@ -274,7 +275,11 @@ where } in all_cache_keys { let doc = match query_analysis - .parse_document(&query, operation_name.as_deref()) + .parse_document( + &query, + operation_name.as_deref(), + ComputeJobType::QueryParsingWarmup, + ) .await { Ok(doc) => doc, diff --git a/apollo-router/src/services/layers/query_analysis.rs b/apollo-router/src/services/layers/query_analysis.rs index 1cc2d2bcaa..c36c7ee8af 100644 --- a/apollo-router/src/services/layers/query_analysis.rs +++ b/apollo-router/src/services/layers/query_analysis.rs @@ -116,6 +116,7 @@ impl QueryAnalysisLayer { &self, query: &str, operation_name: Option<&str>, + compute_job_type: ComputeJobType, ) -> Result> { let query = query.to_string(); let operation_name = operation_name.map(|o| o.to_string()); @@ -125,7 +126,7 @@ impl QueryAnalysisLayer { // Must be created *outside* of the compute_job or the span is not connected to the parent let span = tracing::info_span!(QUERY_PARSING_SPAN_NAME, "otel.kind" = "INTERNAL"); - compute_job::execute(ComputeJobType::QueryParsing, move |_| { + compute_job::execute(compute_job_type, move |_| { span.in_scope(|| { Query::parse_document( &query, @@ -281,7 +282,10 @@ impl QueryAnalysisLayer { .cloned(); let res = match entry { - None => match self.parse_document(&query, op_name.as_deref()).await { + None => match self + .parse_document(&query, op_name.as_deref(), ComputeJobType::QueryParsing) + .await + { Err(e) => { if let MaybeBackPressureError::PermanentError(errors) = &e { (*self.cache.lock().await).put( From 421f3a2ca34bdc82d7885b1f977aba537f15e116 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 10 Apr 2025 11:14:40 -0600 Subject: [PATCH 03/13] parse doc in a loop --- .../query_planner/caching_query_planner.rs | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 39a6db3cf4..d2ead2134a 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -274,16 +274,24 @@ where config_mode_hash: _, } in all_cache_keys { - let doc = match query_analysis - .parse_document( - &query, - operation_name.as_deref(), - ComputeJobType::QueryParsingWarmup, - ) - .await - { - Ok(doc) => doc, - Err(_) => continue, + let doc = loop { + match query_analysis + .parse_document( + &query, + operation_name.as_deref(), + ComputeJobType::QueryParsingWarmup, + ) + .await + { + Ok(doc) => break doc, + Err(MaybeBackPressureError::PermanentError(_)) => { + continue 'all_cache_keys_loop; + } + Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // try again + } + } }; let caching_key = CachingQueryKey { From ad3d94367ddd02665c023f4593ec77cde3e535d3 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 10 Apr 2025 11:22:22 -0600 Subject: [PATCH 04/13] comment re priorities --- apollo-router/src/query_planner/caching_query_planner.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index d2ead2134a..066f5fc682 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -274,6 +274,9 @@ where config_mode_hash: _, } in all_cache_keys { + // NB: warmup query parsing has a very low priority so that real requests are + // prioritized. However, warmup query planning maintains its normal priority so + // that warmup doesn't take an excessively long time. let doc = loop { match query_analysis .parse_document( From 95caa1181253e10a1597ba71fe81a6d17190ff33 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:06:03 -0400 Subject: [PATCH 05/13] Thread ComputeJobType through caching query planner --- apollo-router/src/compute_job/mod.rs | 10 ++++--- .../cost_calculator/static_cost.rs | 2 ++ .../query_planner/caching_query_planner.rs | 6 ++--- .../query_planner/query_planner_service.rs | 27 ++++++++++++++----- apollo-router/src/services/query_planner.rs | 4 +++ 5 files changed, 36 insertions(+), 13 deletions(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index e0ce1ce377..35996b33f0 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -110,15 +110,17 @@ pub(crate) enum ComputeJobType { QueryPlanning, Introspection, QueryParsingWarmup, + QueryPlanningWarmup, } impl From for Priority { fn from(job_type: ComputeJobType) -> Self { match job_type { - ComputeJobType::QueryPlanning => Self::P8, // high - ComputeJobType::QueryParsing => Self::P4, // medium - ComputeJobType::Introspection => Self::P1, // low - ComputeJobType::QueryParsingWarmup => Self::P1, // low + ComputeJobType::QueryPlanning => Self::P8, // high + ComputeJobType::QueryParsing => Self::P4, // medium + ComputeJobType::Introspection => Self::P1, // low + ComputeJobType::QueryParsingWarmup => Self::P1, // low + ComputeJobType::QueryPlanningWarmup => Self::P2, // low } } } diff --git a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs index f35c9d5094..f6f89e701e 100644 --- a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs +++ b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs @@ -644,6 +644,7 @@ mod tests { use crate::Configuration; use crate::Context; use crate::assert_snapshot_subscriber; + use crate::compute_job::ComputeJobType; use crate::plugins::authorization::CacheKeyMetadata; use crate::query_planner::QueryPlannerService; use crate::services::QueryPlannerContent; @@ -745,6 +746,7 @@ mod tests { query, CacheKeyMetadata::default(), PlanOptions::default(), + ComputeJobType::QueryPlanning, )) .await .unwrap(); diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 066f5fc682..458970d99a 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -274,9 +274,7 @@ where config_mode_hash: _, } in all_cache_keys { - // NB: warmup query parsing has a very low priority so that real requests are - // prioritized. However, warmup query planning maintains its normal priority so - // that warmup doesn't take an excessively long time. + // NB: warmup tasks have a low priority so that real requests are prioritized let doc = loop { match query_analysis .parse_document( @@ -339,6 +337,7 @@ where document: doc.clone(), metadata: caching_key.metadata.clone(), plan_options: caching_key.plan_options.clone(), + compute_job_type: ComputeJobType::QueryPlanningWarmup, }; let res = match service.ready().await { Ok(service) => service.call(request).await, @@ -509,6 +508,7 @@ where .document(doc) .metadata(caching_key.metadata) .plan_options(caching_key.plan_options) + .compute_job_type(ComputeJobType::QueryPlanning) .build(); // some clients might timeout and cancel the request before query planning is finished, diff --git a/apollo-router/src/query_planner/query_planner_service.rs b/apollo-router/src/query_planner/query_planner_service.rs index 892bd23ee1..bf46df944d 100644 --- a/apollo-router/src/query_planner/query_planner_service.rs +++ b/apollo-router/src/query_planner/query_planner_service.rs @@ -145,6 +145,7 @@ impl QueryPlannerService { doc: &ParsedDocument, operation: Option, plan_options: PlanOptions, + compute_job_type: ComputeJobType, // Initialization code that needs mutable access to the plan, // before we potentially share it in Arc with a background thread // for "both" mode. @@ -188,7 +189,7 @@ impl QueryPlannerService { let root_node = convert_root_query_plan_node(&plan); Ok((plan, root_node)) }; - let (plan, mut root_node) = compute_job::execute(ComputeJobType::QueryPlanning, job) + let (plan, mut root_node) = compute_job::execute(compute_job_type, job) .map_err(MaybeBackPressureError::TemporaryError)? .await?; if let Some(node) = &mut root_node { @@ -301,14 +302,21 @@ impl QueryPlannerService { selections: Query, plan_options: PlanOptions, doc: &ParsedDocument, + compute_job_type: ComputeJobType, query_metrics: OperationLimits, ) -> Result> { let plan_result = self - .plan_inner(doc, operation.clone(), plan_options, |root_node| { - root_node.init_parsed_operations_and_hash_subqueries(&self.subgraph_schemas)?; - root_node.extract_authorization_metadata(self.schema.supergraph_schema(), &key); - Ok(()) - }) + .plan_inner( + doc, + operation.clone(), + plan_options, + compute_job_type, + |root_node| { + root_node.init_parsed_operations_and_hash_subqueries(&self.subgraph_schemas)?; + root_node.extract_authorization_metadata(self.schema.supergraph_schema(), &key); + Ok(()) + }, + ) .await?; let QueryPlanResult { query_plan_root_node, @@ -387,6 +395,7 @@ impl Service for QueryPlannerService { document, metadata, plan_options, + compute_job_type, } = req; let this = self.clone(); @@ -432,6 +441,7 @@ impl Service for QueryPlannerService { plan_options, }, doc, + compute_job_type, ) .await; @@ -462,6 +472,7 @@ impl QueryPlannerService { &self, mut key: QueryKey, mut doc: ParsedDocument, + compute_job_type: ComputeJobType, ) -> Result> { let mut query_metrics = Default::default(); let mut selections = self @@ -568,6 +579,7 @@ impl QueryPlannerService { selections, key.plan_options, &doc, + compute_job_type, query_metrics, ) .await @@ -718,6 +730,7 @@ mod tests { selections, PlanOptions::default(), &doc, + ComputeJobType::QueryPlanning, query_metrics ) .await @@ -1070,6 +1083,7 @@ mod tests { plan_options: PlanOptions::default(), }, doc, + ComputeJobType::QueryPlanning, ) .await .unwrap(); @@ -1126,6 +1140,7 @@ mod tests { plan_options, }, doc, + ComputeJobType::QueryPlanning, ) .await; match result { diff --git a/apollo-router/src/services/query_planner.rs b/apollo-router/src/services/query_planner.rs index 3e1e809aed..336712a732 100644 --- a/apollo-router/src/services/query_planner.rs +++ b/apollo-router/src/services/query_planner.rs @@ -10,6 +10,7 @@ use static_assertions::assert_impl_all; use super::layers::query_analysis::ParsedDocument; use crate::Context; +use crate::compute_job::ComputeJobType; use crate::compute_job::MaybeBackPressureError; use crate::error::QueryPlannerError; use crate::graphql; @@ -33,6 +34,7 @@ pub(crate) struct Request { pub(crate) document: ParsedDocument, pub(crate) metadata: crate::plugins::authorization::CacheKeyMetadata, pub(crate) plan_options: PlanOptions, + pub(crate) compute_job_type: ComputeJobType, } #[buildstructor::buildstructor] @@ -47,6 +49,7 @@ impl Request { document: ParsedDocument, metadata: crate::plugins::authorization::CacheKeyMetadata, plan_options: PlanOptions, + compute_job_type: ComputeJobType, ) -> Request { Self { query, @@ -54,6 +57,7 @@ impl Request { document, metadata, plan_options, + compute_job_type, } } } From 1db4e2adab3e09a1b817fb52ea1479f73958e018 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Tue, 15 Apr 2025 13:21:09 -0400 Subject: [PATCH 06/13] Add changeset --- .changesets/feat_caroline_low_priority_warmup.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changesets/feat_caroline_low_priority_warmup.md diff --git a/.changesets/feat_caroline_low_priority_warmup.md b/.changesets/feat_caroline_low_priority_warmup.md new file mode 100644 index 0000000000..89bdde8a6e --- /dev/null +++ b/.changesets/feat_caroline_low_priority_warmup.md @@ -0,0 +1,6 @@ +### De-prioritize warm-up process query parsing and planning ([PR #7223](https://github.com/apollographql/router/pull/7223)) + +The router warms up its query planning cache after a schema or configuration change. This change decreases the priority +of warm up tasks in the compute job queue, to reduce the impact of warmup on serving requests. + +By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7223 From e79a0e9714989af59bdfd61a2fcba2ead03582d6 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Wed, 16 Apr 2025 13:21:59 -0400 Subject: [PATCH 07/13] Test that compute queue respects priority levels --- apollo-router/src/compute_job/mod.rs | 79 ++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 5 deletions(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index a45c59ea9d..71be8020df 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -321,14 +321,23 @@ mod tests { use super::*; use crate::assert_snapshot_subscriber; - #[tokio::test] - async fn test_observability() { - // make sure that the queue has been initialized by calling `execute`. if this - // step is skipped, the queue will _sometimes_ be initialized in the step below, - // which causes an additional log line and a snapshot mismatch. + /// Send a request to the compute queue to make sure it is initialized. + /// + /// The queue is (a) wrapped in a `OnceLock`, so it is shared between tests, and (b) only + /// initialized after receiving and processing a request. + /// These two properties can lead to inconsistent behavior. + async fn ensure_queue_is_initialized() { execute(ComputeJobType::Introspection, |_| {}) .unwrap() .await; + } + + #[tokio::test] + async fn test_observability() { + // make sure that the queue has been initialized - if this step is skipped, the + // queue will _sometimes_ be initialized in the step below, which causes an + // additional log line and a snapshot mismatch. + ensure_queue_is_initialized().await; async { let span = info_span!("test_observability"); @@ -402,4 +411,64 @@ mod tests { e => panic!("job did not cancel as expected: {e:?}"), }; } + + #[tokio::test] + async fn test_relative_priorities() { + let pool_size = thread_pool_size(); + ensure_queue_is_initialized().await; + + let start = Instant::now(); + let sleep_duration = Duration::from_millis(100); + let buffered_sleep_duration = Duration::from_millis(120); + + // Send in `pool_size * 3 - 1` low priority requests and 1 high priority request + // We expect the workers to begin right away, so they'll pull `pool_size` low priority + // elements from the queue and work on them. + // However, once the next worker gets free, it should pull the high priority request rather + // than the remaining low priority requests + let low_priority_handles: Vec<_> = (0..pool_size * 3 - 1) + .map(|_| { + execute(ComputeJobType::QueryPlanningWarmup, move |_| { + let inner_start = start.clone(); + std::thread::sleep(sleep_duration.clone()); + inner_start.elapsed() + }) + .unwrap() + }) + .collect(); + let high_priority_handle = execute(ComputeJobType::QueryPlanning, move |_| { + let inner_start = start.clone(); + std::thread::sleep(sleep_duration.clone()); + inner_start.elapsed() + }) + .unwrap(); + + let mut low_priority_durations = + futures::future::join_all(low_priority_handles.into_iter()).await; + let high_priority_duration = high_priority_handle.await; + + // We expect: + // * `pool_size` low priority durations of `sleep_duration` + // * 1 high priority duration of `2 * sleep_duration` + // * `pool_size - 1` low priority durations of `2 * sleep_duration` + // * `pool_size` low priority durations of `3 * sleep_duration` + for _ in 0..pool_size { + let d = low_priority_durations.remove(0); + assert!(d < buffered_sleep_duration); + } + + assert!(high_priority_duration < 2 * buffered_sleep_duration); + for _ in 0..pool_size - 1 { + let d = low_priority_durations.remove(0); + assert!(d < 2 * buffered_sleep_duration); + } + + for _ in 0..pool_size { + let d = low_priority_durations.remove(0); + assert!(d < 3 * buffered_sleep_duration); + } + + assert!(low_priority_durations.is_empty()); + assert!(start.elapsed() < 3 * buffered_sleep_duration); + } } From 065e7968afd9e094c93528577a636268a773b8d1 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Wed, 16 Apr 2025 14:23:01 -0400 Subject: [PATCH 08/13] improve comment --- apollo-router/src/compute_job/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index 71be8020df..6fda920cbb 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -421,7 +421,8 @@ mod tests { let sleep_duration = Duration::from_millis(100); let buffered_sleep_duration = Duration::from_millis(120); - // Send in `pool_size * 3 - 1` low priority requests and 1 high priority request + // Send in `pool_size * 3 - 1` low priority requests and 1 high priority request after the + // low priority requests. // We expect the workers to begin right away, so they'll pull `pool_size` low priority // elements from the queue and work on them. // However, once the next worker gets free, it should pull the high priority request rather From eabbac4fe874d43e85aa3837bf4ab6f4cd679962 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Wed, 16 Apr 2025 16:30:10 -0400 Subject: [PATCH 09/13] Add warm up test --- apollo-router/src/cache/mod.rs | 5 ++ apollo-router/src/compute_job/mod.rs | 8 +- .../query_planner/caching_query_planner.rs | 82 +++++++++++++++++++ 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 7d01cf2449..11f9c52aa1 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -154,6 +154,11 @@ where pub(crate) fn activate(&self) { self.storage.activate() } + + #[cfg(test)] + pub(crate) async fn len(&self) -> usize { + self.storage.len().await + } } pub(crate) struct Entry { diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index 6fda920cbb..0f6c39a95e 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -430,16 +430,16 @@ mod tests { let low_priority_handles: Vec<_> = (0..pool_size * 3 - 1) .map(|_| { execute(ComputeJobType::QueryPlanningWarmup, move |_| { - let inner_start = start.clone(); - std::thread::sleep(sleep_duration.clone()); + let inner_start = start; + std::thread::sleep(sleep_duration); inner_start.elapsed() }) .unwrap() }) .collect(); let high_priority_handle = execute(ComputeJobType::QueryPlanning, move |_| { - let inner_start = start.clone(); - std::thread::sleep(sleep_duration.clone()); + let inner_start = start; + std::thread::sleep(sleep_duration); inner_start.elapsed() }) .unwrap(); diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 1ad52f6a8d..2c481101a8 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -722,6 +722,7 @@ impl ValueType for Result> { mod tests { use mockall::mock; use serde_json_bytes::json; + use std::time::Duration; use test_log::test; use tower::Service; @@ -1173,4 +1174,85 @@ mod tests { panic!("Expected both calls to return same error"); } } + + #[tokio::test] + async fn test_cache_warmup() { + let create_delegate = |call_count| { + let mut delegate = MockMyQueryPlanner::new(); + delegate.expect_clone().times(1).returning(move || { + let mut planner = MockMyQueryPlanner::new(); + planner.expect_sync_call().times(call_count).returning(|_| { + let plan = Arc::new(QueryPlan::fake_new(None, None)); + Ok(QueryPlannerResponse::builder() + .content(QueryPlannerContent::Plan { plan }) + .build()) + }); + planner + }); + delegate + }; + + let configuration: Configuration = Default::default(); + let schema = Arc::new( + Schema::parse( + include_str!("../testdata/starstuff@current.graphql"), + &configuration, + ) + .unwrap(), + ); + + let create_planner = async |delegate| { + CachingQueryPlanner::new( + delegate, + schema.clone(), + Default::default(), + &configuration, + IndexMap::default(), + ) + .await + .unwrap() + }; + + let create_request = || { + let query_str = "query ExampleQuery { me { name } }".to_string(); + let doc = Query::parse_document(&query_str, None, &schema, &configuration).unwrap(); + let context = Context::new(); + context + .extensions() + .with_lock(|lock| lock.insert::(doc)); + query_planner::CachingRequest::new(query_str, None, context) + }; + + // send query to caching planner. it should save this query plan in its cache + let mut planner = create_planner(create_delegate(1)).await; + let response = planner.call(create_request()).await.unwrap(); + assert!(response.content.is_some()); + assert_eq!(planner.cache.len().await, 1); + + // create and warm up a new planner. new planner's delegate should be called once during + // the warm-up phase to populate the cache + let query_analysis_layer = + QueryAnalysisLayer::new(schema.clone(), Arc::new(configuration.clone())).await; + let mut new_planner = create_planner(create_delegate(1)).await; + new_planner + .warm_up( + &query_analysis_layer, + &Arc::new(PersistedQueryLayer::new(&configuration).await.unwrap()), + Some(planner.previous_cache()), + Some(1), + Default::default(), + &Default::default(), + ) + .await; + // wait a beat - items are added to cache asynchronously, so this helps avoid flakiness + tokio::time::sleep(Duration::from_millis(10)).await; + assert_eq!(new_planner.cache.len().await, 1); + + // create a new delegate that _shouldn't_ be called since the new planner already has the + // result in its cache + new_planner.delegate = create_delegate(0); + let response = new_planner.call(create_request()).await.unwrap(); + assert!(response.content.is_some()); + assert_eq!(new_planner.cache.len().await, 1); + } } From 6895dd1d3b6f234c88386490f67af8fd7c51c1dc Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Wed, 16 Apr 2025 16:40:24 -0400 Subject: [PATCH 10/13] forgot to rerun formatter --- apollo-router/src/query_planner/caching_query_planner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 2c481101a8..26649a6ab6 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -720,9 +720,10 @@ impl ValueType for Result> { #[cfg(test)] mod tests { + use std::time::Duration; + use mockall::mock; use serde_json_bytes::json; - use std::time::Duration; use test_log::test; use tower::Service; From 7e210443f83c96b978949dcfcfa7d8ac8cc010e0 Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 17 Apr 2025 08:34:52 -0400 Subject: [PATCH 11/13] Bump up introspection priority --- apollo-router/src/compute_job/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index 0f6c39a95e..e91d46aaa5 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -125,7 +125,7 @@ impl From for Priority { match job_type { ComputeJobType::QueryPlanning => Self::P8, // high ComputeJobType::QueryParsing => Self::P4, // medium - ComputeJobType::Introspection => Self::P1, // low + ComputeJobType::Introspection => Self::P3, // low ComputeJobType::QueryParsingWarmup => Self::P1, // low ComputeJobType::QueryPlanningWarmup => Self::P2, // low } From 36c626543db404624f04b23a35b645fcc9e24b7a Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 17 Apr 2025 09:54:51 -0400 Subject: [PATCH 12/13] fix test to not rely on precise timings --- apollo-router/src/compute_job/mod.rs | 48 +++++++++------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/apollo-router/src/compute_job/mod.rs b/apollo-router/src/compute_job/mod.rs index e91d46aaa5..db8396a23a 100644 --- a/apollo-router/src/compute_job/mod.rs +++ b/apollo-router/src/compute_job/mod.rs @@ -418,20 +418,19 @@ mod tests { ensure_queue_is_initialized().await; let start = Instant::now(); - let sleep_duration = Duration::from_millis(100); - let buffered_sleep_duration = Duration::from_millis(120); - // Send in `pool_size * 3 - 1` low priority requests and 1 high priority request after the + // Send in `pool_size * 2 - 1` low priority requests and 1 high priority request after the // low priority requests. - // We expect the workers to begin right away, so they'll pull `pool_size` low priority - // elements from the queue and work on them. - // However, once the next worker gets free, it should pull the high priority request rather - // than the remaining low priority requests - let low_priority_handles: Vec<_> = (0..pool_size * 3 - 1) + // If the queue were isolated, we'd expect `pool_size` low priority requests to complete + // before the high priority requests, since the workers would start on the low priority + // requests immediately. + // But, the queue is not isolated. This loosens our guarantees - we expect _up to_ `pool_size` + // low priority requests to complete before the high priority request. + let low_priority_handles: Vec<_> = (0..pool_size * 2 - 1) .map(|_| { execute(ComputeJobType::QueryPlanningWarmup, move |_| { let inner_start = start; - std::thread::sleep(sleep_duration); + std::thread::sleep(Duration::from_millis(10)); inner_start.elapsed() }) .unwrap() @@ -439,37 +438,20 @@ mod tests { .collect(); let high_priority_handle = execute(ComputeJobType::QueryPlanning, move |_| { let inner_start = start; - std::thread::sleep(sleep_duration); + std::thread::sleep(Duration::from_millis(5)); inner_start.elapsed() }) .unwrap(); - let mut low_priority_durations = + let low_priority_durations = futures::future::join_all(low_priority_handles.into_iter()).await; let high_priority_duration = high_priority_handle.await; - // We expect: - // * `pool_size` low priority durations of `sleep_duration` - // * 1 high priority duration of `2 * sleep_duration` - // * `pool_size - 1` low priority durations of `2 * sleep_duration` - // * `pool_size` low priority durations of `3 * sleep_duration` - for _ in 0..pool_size { - let d = low_priority_durations.remove(0); - assert!(d < buffered_sleep_duration); - } - - assert!(high_priority_duration < 2 * buffered_sleep_duration); - for _ in 0..pool_size - 1 { - let d = low_priority_durations.remove(0); - assert!(d < 2 * buffered_sleep_duration); - } - - for _ in 0..pool_size { - let d = low_priority_durations.remove(0); - assert!(d < 3 * buffered_sleep_duration); - } + let low_before_high_count = low_priority_durations + .iter() + .filter(|d| d < &&high_priority_duration) + .count(); - assert!(low_priority_durations.is_empty()); - assert!(start.elapsed() < 3 * buffered_sleep_duration); + assert!(low_before_high_count <= pool_size); } } From 0e3579313232d8336545218a0bbfcc6e9ef3ec1f Mon Sep 17 00:00:00 2001 From: carodewig <16093297+carodewig@users.noreply.github.com> Date: Thu, 17 Apr 2025 10:45:47 -0400 Subject: [PATCH 13/13] document new job types in the changeset --- .changesets/feat_caroline_low_priority_warmup.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.changesets/feat_caroline_low_priority_warmup.md b/.changesets/feat_caroline_low_priority_warmup.md index 89bdde8a6e..5bd6f3b858 100644 --- a/.changesets/feat_caroline_low_priority_warmup.md +++ b/.changesets/feat_caroline_low_priority_warmup.md @@ -3,4 +3,16 @@ The router warms up its query planning cache after a schema or configuration change. This change decreases the priority of warm up tasks in the compute job queue, to reduce the impact of warmup on serving requests. +This change adds new values to the `job.type` dimension of the following metrics: +- `apollo.router.compute_jobs.duration` - A histogram of time spent in the compute pipeline by the job, including the queue and query planning. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) + - `job.outcome`: (`executed_ok`, `executed_error`, `channel_error`, `rejected_queue_full`, `abandoned`) +- `apollo.router.compute_jobs.queue.wait.duration` - A histogram of time spent in the compute queue by the job. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) +- `apollo.router.compute_jobs.execution.duration` - A histogram of time spent to execute job (excludes time spent in the queue). + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) +- `apollo.router.compute_jobs.active_jobs` - A gauge of the number of compute jobs being processed in parallel. + - `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**) + + By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7223