Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df7bfd6
don't re-parse document
carodewig Apr 10, 2025
3bba9a4
de-prioritize query parsing during warmup
carodewig Apr 10, 2025
421f3a2
parse doc in a loop
carodewig Apr 10, 2025
ad3d943
comment re priorities
carodewig Apr 10, 2025
95caa11
Thread ComputeJobType through caching query planner
carodewig Apr 15, 2025
e26fe90
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 15, 2025
1db4e2a
Add changeset
carodewig Apr 15, 2025
33d9128
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 16, 2025
e79a0e9
Test that compute queue respects priority levels
carodewig Apr 16, 2025
065e796
improve comment
carodewig Apr 16, 2025
eabbac4
Add warm up test
carodewig Apr 16, 2025
e8bfa1c
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 16, 2025
6895dd1
forgot to rerun formatter
carodewig Apr 16, 2025
fcfadee
Merge branch 'caroline/low-priority-warmup' of https://github.com/apo…
carodewig Apr 16, 2025
7e21044
Bump up introspection priority
carodewig Apr 17, 2025
36c6265
fix test to not rely on precise timings
carodewig Apr 17, 2025
c59bfb6
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 17, 2025
0e35793
document new job types in the changeset
carodewig Apr 17, 2025
4e93ffb
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 24, 2025
c30a493
Merge branch 'dev' into caroline/low-priority-warmup
carodewig Apr 30, 2025
9092f55
Merge branch 'dev' into caroline/low-priority-warmup
carodewig May 5, 2025
f1c1e1e
Merge branch 'dev' into caroline/low-priority-warmup
SimonSapin May 23, 2025
bd37e15
Merge branch 'dev' into caroline/low-priority-warmup
carodewig May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .changesets/feat_caroline_low_priority_warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
### De-prioritize warm-up process query parsing and planning ([PR #7223](https://github.com/apollographql/router/pull/7223))

The router warms up its query planning cache after a schema or configuration change. This change decreases the priority
of warm up tasks in the compute job queue, to reduce the impact of warmup on serving requests.

This change adds new values to the `job.type` dimension of the following metrics:
- `apollo.router.compute_jobs.duration` - A histogram of time spent in the compute pipeline by the job, including the queue and query planning.
- `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**)
- `job.outcome`: (`executed_ok`, `executed_error`, `channel_error`, `rejected_queue_full`, `abandoned`)
- `apollo.router.compute_jobs.queue.wait.duration` - A histogram of time spent in the compute queue by the job.
- `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**)
- `apollo.router.compute_jobs.execution.duration` - A histogram of time spent to execute job (excludes time spent in the queue).
- `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**)
- `apollo.router.compute_jobs.active_jobs` - A gauge of the number of compute jobs being processed in parallel.
- `job.type`: (`query_planning`, `query_parsing`, `introspection`, **`query_planning_warmup`, `query_parsing_warmup`**)


By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7223
5 changes: 5 additions & 0 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ where
pub(crate) fn activate(&self) {
self.storage.activate()
}

#[cfg(test)]
pub(crate) async fn len(&self) -> usize {
self.storage.len().await
}
}

pub(crate) struct Entry<K: KeyType, V: ValueType, UncachedError> {
Expand Down
72 changes: 64 additions & 8 deletions apollo-router/src/compute_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,18 @@ pub(crate) enum ComputeJobType {
QueryParsing,
QueryPlanning,
Introspection,
QueryParsingWarmup,
QueryPlanningWarmup,
}

impl From<ComputeJobType> for Priority {
fn from(job_type: ComputeJobType) -> Self {
match job_type {
ComputeJobType::QueryPlanning => Self::P8, // high
ComputeJobType::QueryParsing => Self::P4, // medium
ComputeJobType::Introspection => Self::P1, // low
ComputeJobType::QueryPlanning => Self::P8, // high
ComputeJobType::QueryParsing => Self::P4, // medium
ComputeJobType::Introspection => Self::P3, // low
ComputeJobType::QueryParsingWarmup => Self::P1, // low
ComputeJobType::QueryPlanningWarmup => Self::P2, // low
}
}
}
Expand Down Expand Up @@ -318,14 +322,23 @@ mod tests {
use super::*;
use crate::assert_snapshot_subscriber;

#[tokio::test]
async fn test_observability() {
// make sure that the queue has been initialized by calling `execute`. if this
// step is skipped, the queue will _sometimes_ be initialized in the step below,
// which causes an additional log line and a snapshot mismatch.
/// Send a request to the compute queue to make sure it is initialized.
///
/// The queue is (a) wrapped in a `OnceLock`, so it is shared between tests, and (b) only
/// initialized after receiving and processing a request.
/// These two properties can lead to inconsistent behavior.
async fn ensure_queue_is_initialized() {
execute(ComputeJobType::Introspection, |_| {})
.unwrap()
.await;
}

#[tokio::test]
async fn test_observability() {
// make sure that the queue has been initialized - if this step is skipped, the
// queue will _sometimes_ be initialized in the step below, which causes an
// additional log line and a snapshot mismatch.
ensure_queue_is_initialized().await;

async {
let span = info_span!("test_observability");
Expand Down Expand Up @@ -399,4 +412,47 @@ mod tests {
e => panic!("job did not cancel as expected: {e:?}"),
};
}

#[tokio::test]
async fn test_relative_priorities() {
let pool_size = thread_pool_size();
ensure_queue_is_initialized().await;

let start = Instant::now();

// Send in `pool_size * 2 - 1` low priority requests and 1 high priority request after the
// low priority requests.
// If the queue were isolated, we'd expect `pool_size` low priority requests to complete
// before the high priority requests, since the workers would start on the low priority
// requests immediately.
// But, the queue is not isolated. This loosens our guarantees - we expect _up to_ `pool_size`
// low priority requests to complete before the high priority request.
let low_priority_handles: Vec<_> = (0..pool_size * 2 - 1)
.map(|_| {
execute(ComputeJobType::QueryPlanningWarmup, move |_| {
let inner_start = start;
std::thread::sleep(Duration::from_millis(10));
inner_start.elapsed()
})
.unwrap()
})
.collect();
let high_priority_handle = execute(ComputeJobType::QueryPlanning, move |_| {
let inner_start = start;
std::thread::sleep(Duration::from_millis(5));
inner_start.elapsed()
})
.unwrap();

let low_priority_durations =
futures::future::join_all(low_priority_handles.into_iter()).await;
let high_priority_duration = high_priority_handle.await;

let low_before_high_count = low_priority_durations
.iter()
.filter(|d| d < &&high_priority_duration)
.count();

assert!(low_before_high_count <= pool_size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ mod tests {
use crate::Configuration;
use crate::Context;
use crate::assert_snapshot_subscriber;
use crate::compute_job::ComputeJobType;
use crate::plugins::authorization::CacheKeyMetadata;
use crate::query_planner::QueryPlannerService;
use crate::services::QueryPlannerContent;
Expand Down Expand Up @@ -745,6 +746,7 @@ mod tests {
query,
CacheKeyMetadata::default(),
PlanOptions::default(),
ComputeJobType::QueryPlanning,
))
.await
.unwrap();
Expand Down
131 changes: 105 additions & 26 deletions apollo-router/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::cache::estimate_size;
use crate::cache::storage::InMemoryCache;
use crate::cache::storage::ValueType;
use crate::compute_job::ComputeBackPressureError;
use crate::compute_job::ComputeJobType;
use crate::compute_job::MaybeBackPressureError;
use crate::configuration::PersistedQueriesPrewarmQueryPlanCache;
use crate::error::CacheResolverError;
Expand Down Expand Up @@ -273,12 +274,25 @@ where
config_mode_hash: _,
} in all_cache_keys
{
let doc = match query_analysis
.parse_document(&query, operation_name.as_deref())
.await
{
Ok(doc) => doc,
Err(_) => continue,
// NB: warmup tasks have a low priority so that real requests are prioritized
let doc = loop {
match query_analysis
.parse_document(
&query,
operation_name.as_deref(),
ComputeJobType::QueryParsingWarmup,
)
.await
{
Ok(doc) => break doc,
Err(MaybeBackPressureError::PermanentError(_)) => {
continue 'all_cache_keys_loop;
}
Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// try again
}
}
};

let caching_key = CachingQueryKey {
Expand Down Expand Up @@ -316,33 +330,14 @@ where
})
.await;
if entry.is_first() {
let doc = loop {
match query_analysis
.parse_document(&query, operation_name.as_deref())
.await
{
Ok(doc) => break doc,
Err(MaybeBackPressureError::PermanentError(error)) => {
let e = Arc::new(QueryPlannerError::SpecError(error));
tokio::spawn(async move {
entry.insert(Err(e)).await;
});
continue 'all_cache_keys_loop;
}
Err(MaybeBackPressureError::TemporaryError(ComputeBackPressureError)) => {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// try again
}
}
};

loop {
let request = QueryPlannerRequest {
query: query.clone(),
operation_name: operation_name.clone(),
document: doc.clone(),
metadata: caching_key.metadata.clone(),
plan_options: caching_key.plan_options.clone(),
compute_job_type: ComputeJobType::QueryPlanningWarmup,
};
let res = match service.ready().await {
Ok(service) => service.call(request).await,
Expand Down Expand Up @@ -510,6 +505,7 @@ where
.document(doc)
.metadata(caching_key.metadata)
.plan_options(caching_key.plan_options)
.compute_job_type(ComputeJobType::QueryPlanning)
.build();

// some clients might timeout and cancel the request before query planning is finished,
Expand Down Expand Up @@ -724,6 +720,8 @@ impl ValueType for Result<QueryPlannerContent, Arc<QueryPlannerError>> {

#[cfg(test)]
mod tests {
use std::time::Duration;

use mockall::mock;
use serde_json_bytes::json;
use test_log::test;
Expand Down Expand Up @@ -1177,4 +1175,85 @@ mod tests {
panic!("Expected both calls to return same error");
}
}

#[tokio::test]
async fn test_cache_warmup() {
let create_delegate = |call_count| {
let mut delegate = MockMyQueryPlanner::new();
delegate.expect_clone().times(1).returning(move || {
let mut planner = MockMyQueryPlanner::new();
planner.expect_sync_call().times(call_count).returning(|_| {
let plan = Arc::new(QueryPlan::fake_new(None, None));
Ok(QueryPlannerResponse::builder()
.content(QueryPlannerContent::Plan { plan })
.build())
});
planner
});
delegate
};

let configuration: Configuration = Default::default();
let schema = Arc::new(
Schema::parse(
include_str!("../testdata/starstuff@current.graphql"),
&configuration,
)
.unwrap(),
);

let create_planner = async |delegate| {
CachingQueryPlanner::new(
delegate,
schema.clone(),
Default::default(),
&configuration,
IndexMap::default(),
)
.await
.unwrap()
};

let create_request = || {
let query_str = "query ExampleQuery { me { name } }".to_string();
let doc = Query::parse_document(&query_str, None, &schema, &configuration).unwrap();
let context = Context::new();
context
.extensions()
.with_lock(|lock| lock.insert::<ParsedDocument>(doc));
query_planner::CachingRequest::new(query_str, None, context)
};

// send query to caching planner. it should save this query plan in its cache
let mut planner = create_planner(create_delegate(1)).await;
let response = planner.call(create_request()).await.unwrap();
assert!(response.content.is_some());
assert_eq!(planner.cache.len().await, 1);

// create and warm up a new planner. new planner's delegate should be called once during
// the warm-up phase to populate the cache
let query_analysis_layer =
QueryAnalysisLayer::new(schema.clone(), Arc::new(configuration.clone())).await;
let mut new_planner = create_planner(create_delegate(1)).await;
new_planner
.warm_up(
&query_analysis_layer,
&Arc::new(PersistedQueryLayer::new(&configuration).await.unwrap()),
Some(planner.previous_cache()),
Some(1),
Default::default(),
&Default::default(),
)
.await;
// wait a beat - items are added to cache asynchronously, so this helps avoid flakiness
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(new_planner.cache.len().await, 1);

// create a new delegate that _shouldn't_ be called since the new planner already has the
// result in its cache
new_planner.delegate = create_delegate(0);
let response = new_planner.call(create_request()).await.unwrap();
assert!(response.content.is_some());
assert_eq!(new_planner.cache.len().await, 1);
}
}
Loading