diff --git a/.changesets/feat_zelda_feature_cooperative_cancellation.md b/.changesets/feat_zelda_feature_cooperative_cancellation.md new file mode 100644 index 0000000000..0490986c7e --- /dev/null +++ b/.changesets/feat_zelda_feature_cooperative_cancellation.md @@ -0,0 +1,22 @@ +## Cooperative Cancellation for Query Planning + +This release introduces cooperative cancellation support for query planning operations. This feature allows the router +to gracefully handle query planning timeouts and cancellations, improving resource utilization. +Metrics are emitted for cooperative cancellation: + +- Records the "outcome" of query planning on the `apollo.router.query_planning.plan.duration` metric. +- Records the "outcome" of query planning on the `query_planning` span. + +### Example + +Configuring a timeout in Measure Mode: +```yaml +supergraph: + query_planning: + experimental_cooperative_cancellation: + enabled: true + mode: measure + timeout: 1s +``` + +By [@Velfi](https://github.com/Velfi) in https://github.com/apollographql/router/pull/7604 diff --git a/Cargo.lock b/Cargo.lock index b3c2532030..ef31e4695c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -376,6 +376,7 @@ dependencies = [ "rustls-pemfile", "ryu", "schemars", + "scopeguard", "semver", "serde", "serde_derive_default", diff --git a/apollo-federation/src/query_plan/query_planner.rs b/apollo-federation/src/query_plan/query_planner.rs index 50cf638170..0bf6bca674 100644 --- a/apollo-federation/src/query_plan/query_planner.rs +++ b/apollo-federation/src/query_plan/query_planner.rs @@ -76,7 +76,7 @@ pub struct QueryPlannerConfig { // Side-note: implemented as an object instead of single boolean because we expect to add more // to this soon enough. In particular, once defer-passthrough to subgraphs is implemented, the // idea would be to add a new `passthrough_subgraphs` option that is the list of subgraphs to - // which we can pass-through some @defer (and it would be empty by default). Similarly, once we + // which we can pass through some @defer (and it would be empty by default). Similarly, once we // support @stream, grouping the options here will make sense too. pub incremental_delivery: QueryPlanIncrementalDeliveryConfig, @@ -185,7 +185,14 @@ pub struct QueryPlanOptions<'a> { /// progressive @override feature. // PORT_NOTE: In JS implementation this was a Map pub override_conditions: Vec, - + /// An optional function that will be called to check if the query plan should be cancelled. + /// + /// Cooperative cancellation occurs when the original client has abandoned the query. + /// When this happens, the query plan should be cancelled to free up resources. + /// + /// This function should return `ControlFlow::Break` if the query plan should be cancelled. + /// + /// Defaults to `None`. pub check_for_cooperative_cancellation: Option<&'a dyn Fn() -> ControlFlow<()>>, /// Impose a limit on the number of non-local selections, which can be a /// performance hazard. On by default. diff --git a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs index ed40dc3d39..6052e16b08 100644 --- a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs +++ b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides.rs @@ -25,7 +25,6 @@ fn it_handles_progressive_override_on_root_fields() { "#, QueryPlanOptions { override_conditions: vec!["test".to_string()], - check_for_cooperative_cancellation: None, ..Default::default() }, @r###" @@ -120,7 +119,6 @@ fn it_handles_progressive_override_on_entity_fields() { "#, QueryPlanOptions { override_conditions: vec!["test".to_string()], - check_for_cooperative_cancellation: None, ..Default::default() }, @r###" @@ -281,7 +279,6 @@ fn it_handles_progressive_override_on_nested_entity_fields() { "#, QueryPlanOptions { override_conditions: vec!["test".to_string()], - check_for_cooperative_cancellation: None, ..Default::default() }, @r###" diff --git a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs index be3ddff22f..98c3cf9e74 100644 --- a/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs +++ b/apollo-federation/tests/query_plan/build_query_plan_tests/overrides/shareable.rs @@ -46,7 +46,6 @@ fn it_overrides_to_s2_when_label_is_provided() { "#, QueryPlanOptions { override_conditions: vec!["test".to_string()], - check_for_cooperative_cancellation: None, ..Default::default() }, @r###" @@ -160,7 +159,6 @@ fn it_overrides_f1_to_s3_when_label_is_provided() { "#, QueryPlanOptions { override_conditions: vec!["test".to_string()], - check_for_cooperative_cancellation: None, ..Default::default() }, @r###" diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index c9642dbd53..6c59ec3025 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -263,6 +263,7 @@ form_urlencoded = "1.2.1" apollo-environment-detector = "0.1.0" log = "0.4.22" encoding_rs = "0.8.35" +scopeguard = "1.2.0" [target.'cfg(macos)'.dependencies] uname = "0.1.1" diff --git a/apollo-router/src/configuration/cooperative_cancellation.rs b/apollo-router/src/configuration/cooperative_cancellation.rs new file mode 100644 index 0000000000..f87f990a08 --- /dev/null +++ b/apollo-router/src/configuration/cooperative_cancellation.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; + +use crate::configuration::mode::Mode; + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(deny_unknown_fields, default)] +pub(crate) struct CooperativeCancellation { + /// When true, cooperative cancellation is enabled. + enabled: bool, + /// When enabled, this sets whether the router will cancel query planning or + /// merely emit a metric when it would have happened. + mode: Mode, + #[serde(deserialize_with = "humantime_serde::deserialize")] + #[serde(serialize_with = "humantime_serde::serialize")] + #[schemars(with = "Option")] + /// Enable timeout for query planning. + timeout: Option, +} + +impl Default for CooperativeCancellation { + fn default() -> Self { + Self { + enabled: true, + mode: Mode::Measure, + timeout: None, + } + } +} + +impl CooperativeCancellation { + /// Returns the timeout, if configured. + pub(crate) fn timeout(&self) -> Option { + self.timeout + } + + #[cfg(test)] + /// Create a new `CooperativeCancellation` config in enforcement mode. + pub(crate) fn enabled() -> Self { + Self { + enabled: true, + mode: Mode::Enforce, + timeout: None, + } + } + + /// Returns true if cooperative cancellation is enabled. + pub(crate) fn is_enabled(&self) -> bool { + self.enabled + } + + /// Returns true if this config is in measure mode. + pub(crate) fn is_measure_mode(&self) -> bool { + self.mode.is_measure_mode() + } + + /// Returns true if this config is in enforce mode. + pub(crate) fn is_enforce_mode(&self) -> bool { + self.mode.is_enforce_mode() + } + + #[cfg(test)] + /// Create a new `CooperativeCancellation` config in enforcement mode with a timeout. + pub(crate) fn enabled_with_timeout(timeout: Duration) -> Self { + Self { + enabled: true, + mode: Mode::Enforce, + timeout: Some(timeout), + } + } +} diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index bf89dd8cc3..1705a15139 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -26,7 +26,6 @@ use regex::Regex; use rustls::ServerConfig; use rustls::pki_types::CertificateDer; use rustls::pki_types::PrivateKeyDer; -use schema::Mode; use schemars::JsonSchema; use schemars::r#gen::SchemaGenerator; use schemars::schema::ObjectValidation; @@ -50,6 +49,7 @@ use self::server::Server; use self::subgraph::SubgraphConfiguration; use crate::ApolloRouterError; use crate::cache::DEFAULT_CACHE_CAPACITY; +use crate::configuration::cooperative_cancellation::CooperativeCancellation; use crate::graphql; use crate::notification::Notify; use crate::plugin::plugins; @@ -63,10 +63,12 @@ use crate::plugins::subscription::SubscriptionConfig; use crate::uplink::UplinkConfig; pub(crate) mod connector; +pub(crate) mod cooperative_cancellation; pub(crate) mod cors; pub(crate) mod expansion; mod experimental; pub(crate) mod metrics; +pub(crate) mod mode; mod persisted_queries; pub(crate) mod schema; pub(crate) mod server; @@ -573,7 +575,8 @@ impl FromStr for Configuration { type Err = ConfigurationError; fn from_str(s: &str) -> Result { - schema::validate_yaml_configuration(s, Expansion::default()?, Mode::Upgrade)?.validate() + schema::validate_yaml_configuration(s, Expansion::default()?, schema::Mode::Upgrade)? + .validate() } } @@ -900,6 +903,35 @@ pub(crate) struct QueryPlanning { /// If cache warm up is configured, this will allow the router to keep a query plan created with /// the old schema, if it determines that the schema update does not affect the corresponding query pub(crate) experimental_reuse_query_plans: bool, + + /// Configures cooperative cancellation of query planning + /// + /// See [`CooperativeCancellation`] for more details. + pub(crate) experimental_cooperative_cancellation: CooperativeCancellation, +} + +#[buildstructor::buildstructor] +impl QueryPlanning { + #[builder] + #[allow(dead_code)] + pub(crate) fn new( + cache: Option, + warmed_up_queries: Option, + experimental_plans_limit: Option, + experimental_paths_limit: Option, + experimental_reuse_query_plans: Option, + experimental_cooperative_cancellation: Option, + ) -> Self { + Self { + cache: cache.unwrap_or_default(), + warmed_up_queries, + experimental_plans_limit, + experimental_paths_limit, + experimental_reuse_query_plans: experimental_reuse_query_plans.unwrap_or_default(), + experimental_cooperative_cancellation: experimental_cooperative_cancellation + .unwrap_or_default(), + } + } } /// Cache configuration diff --git a/apollo-router/src/configuration/mode.rs b/apollo-router/src/configuration/mode.rs new file mode 100644 index 0000000000..1257b07cb2 --- /dev/null +++ b/apollo-router/src/configuration/mode.rs @@ -0,0 +1,24 @@ +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; + +// Don't add a default here. Instead, Default should be implemented for +// individual cases of Mode. +#[derive(Debug, Clone, Copy, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub(crate) enum Mode { + Measure, + Enforce, +} + +impl Mode { + /// Returns true if this config is in measure mode. + pub(crate) fn is_measure_mode(&self) -> bool { + matches!(self, Mode::Measure) + } + + /// Returns true if this config is in enforce mode. + pub(crate) fn is_enforce_mode(&self) -> bool { + matches!(self, Mode::Enforce) + } +} diff --git a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap index 32e5517adf..970489f464 100644 --- a/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap +++ b/apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap @@ -2311,6 +2311,27 @@ expression: "&schema" ], "description": "Configures the context" }, + "CooperativeCancellation": { + "additionalProperties": false, + "properties": { + "enabled": { + "default": true, + "description": "When true, cooperative cancellation is enabled.", + "type": "boolean" + }, + "mode": { + "$ref": "#/definitions/Mode", + "description": "#/definitions/Mode" + }, + "timeout": { + "default": null, + "description": "Enable timeout for query planning.", + "nullable": true, + "type": "string" + } + }, + "type": "object" + }, "Cors": { "additionalProperties": false, "description": "Cross origin request configuration.", @@ -2658,8 +2679,8 @@ expression: "&schema" "type": "boolean" }, "mode": { - "$ref": "#/definitions/Mode", - "description": "#/definitions/Mode" + "$ref": "#/definitions/Mode2", + "description": "#/definitions/Mode2" }, "strategy": { "$ref": "#/definitions/StrategyConfig", @@ -4574,6 +4595,13 @@ expression: "&schema" ], "type": "string" }, + "Mode2": { + "enum": [ + "measure", + "enforce" + ], + "type": "string" + }, "MultipartRequest": { "additionalProperties": false, "description": "Configuration for a multipart request for file uploads.\n\nThis protocol conforms to [jaydenseric's multipart spec](https://github.com/jaydenseric/graphql-multipart-request-spec)", @@ -5076,6 +5104,10 @@ expression: "&schema" "$ref": "#/definitions/QueryPlanCache", "description": "#/definitions/QueryPlanCache" }, + "experimental_cooperative_cancellation": { + "$ref": "#/definitions/CooperativeCancellation", + "description": "#/definitions/CooperativeCancellation" + }, "experimental_paths_limit": { "default": null, "description": "Before creating query plans, for each path of fields in the query we compute all the possible options to traverse that path via the subgraphs. Multiple options can arise because fields in the path can be provided by multiple subgraphs, and abstract types (i.e. unions and interfaces) returned by fields sometimes require the query planner to traverse through each constituent object type. The number of options generated in this computation can grow large if the schema or query are sufficiently complex, and that will increase the time spent planning.\n\nThis config allows specifying a per-path limit to the number of options considered. If any path's options exceeds this limit, query planning will abort and the operation will fail.\n\nThe default value is None, which specifies no limit.", diff --git a/apollo-router/src/error.rs b/apollo-router/src/error.rs index 56dcdcb47d..a036b3b47b 100644 --- a/apollo-router/src/error.rs +++ b/apollo-router/src/error.rs @@ -292,6 +292,9 @@ pub(crate) enum QueryPlannerError { /// Federation error: {0} FederationError(FederationErrorBridge), + + /// Query planning timed out: {0} + Timeout(String), } impl From for QueryPlannerError { @@ -314,6 +317,8 @@ pub(crate) enum FederationErrorBridge { OperationNameNotProvided(String), /// {0} Other(String), + /// {0} + Cancellation(String), } impl From for FederationErrorBridge { @@ -325,6 +330,9 @@ impl From for FederationErrorBridge { err @ FederationError::SingleFederationError( apollo_federation::error::SingleFederationError::OperationNameNotProvided, ) => Self::OperationNameNotProvided(err.to_string()), + err @ FederationError::SingleFederationError( + apollo_federation::error::SingleFederationError::PlanningCancelled, + ) => Self::Cancellation(err.to_string()), err => Self::Other(err.to_string()), } } diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 93dd96c7d6..d84995caab 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -1,6 +1,8 @@ use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; +use std::sync::atomic::AtomicU8; +use std::sync::atomic::Ordering; use std::task; use futures::future::BoxFuture; @@ -10,6 +12,7 @@ use rand::seq::SliceRandom; use rand::thread_rng; use sha2::Digest; use sha2::Sha256; +use tokio_util::time::FutureExt; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; @@ -27,6 +30,7 @@ use crate::compute_job::ComputeBackPressureError; use crate::compute_job::ComputeJobType; use crate::compute_job::MaybeBackPressureError; use crate::configuration::PersistedQueriesPrewarmQueryPlanCache; +use crate::configuration::cooperative_cancellation::CooperativeCancellation; use crate::error::CacheResolverError; use crate::error::QueryPlannerError; use crate::plugins::authorization::AuthorizationPlugin; @@ -48,6 +52,14 @@ use crate::spec::Schema; use crate::spec::SchemaHash; use crate::spec::SpecError; +#[repr(u8)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +enum Outcome { + None = 0, + Timeout = 1, + Cancelled = 2, +} + /// An [`IndexMap`] of available plugins. pub(crate) type Plugins = IndexMap>; pub(crate) type InMemoryCachePlanner = @@ -92,6 +104,7 @@ pub(crate) struct CachingQueryPlanner { plugins: Arc, enable_authorization_directives: bool, config_mode_hash: Arc, + cooperative_cancellation: CooperativeCancellation, } fn init_query_plan_from_redis( @@ -99,7 +112,7 @@ fn init_query_plan_from_redis( cache_entry: &mut Result>, ) -> Result<(), String> { if let Ok(QueryPlannerContent::Plan { plan }) = cache_entry { - // Arc freshly deserialized from Redis should be unique, so this doesn’t clone: + // Arc freshly deserialized from Redis should be unique, so this doesn't clone: let plan = Arc::make_mut(plan); let root = Arc::make_mut(&mut plan.root); root.init_parsed_operations(subgraph_schemas) @@ -139,6 +152,11 @@ where let mut hasher = StructHasher::new(); configuration.rust_query_planner_config().hash(&mut hasher); let config_mode_hash = Arc::new(ConfigModeHash(hasher.finalize())); + let cooperative_cancellation = configuration + .supergraph + .query_planning + .experimental_cooperative_cancellation + .clone(); Ok(Self { cache, @@ -147,6 +165,7 @@ where subgraph_schemas, plugins: Arc::new(plugins), enable_authorization_directives, + cooperative_cancellation, config_mode_hash, }) } @@ -387,15 +406,14 @@ impl CachingQueryPlanner { } } -impl tower::Service - for CachingQueryPlanner +impl Service for CachingQueryPlanner where - T: tower::Service< + T: Service< QueryPlannerRequest, Response = QueryPlannerResponse, Error = MaybeBackPressureError, >, - >::Future: Send, + >::Future: Send, { type Response = QueryPlannerResponse; type Error = CacheResolverError; @@ -425,21 +443,29 @@ where } } +const BACKPRESSURE: &str = "backpressure"; +const BATCHING_ERROR: &str = "batching_error"; +const CANCELLED: &str = "cancelled"; +const ERROR: &str = "error"; +const OUTCOME: &str = "outcome"; +const SUCCESS: &str = "success"; +const TIMEOUT: &str = "timeout"; + impl CachingQueryPlanner where - T: tower::Service< + T: Service< QueryPlannerRequest, Response = QueryPlannerResponse, Error = MaybeBackPressureError, > + Clone + Send + 'static, - >::Future: Send, + >::Future: Send, { async fn plan( mut self, request: query_planner::CachingRequest, - ) -> Result<>::Response, CacheResolverError> { + ) -> Result<>::Response, CacheResolverError> { if self.enable_authorization_directives { AuthorizationPlugin::update_cache_key(&request.context); } @@ -507,96 +533,205 @@ where .compute_job_type(ComputeJobType::QueryPlanning) .build(); - // some clients might timeout and cancel the request before query planning is finished, - // so we execute it in a task that can continue even after the request was canceled and - // the join handle was dropped. That way, the next similar query will use the cache instead - // of restarting the query planner until another timeout - tokio::task::spawn( - async move { - let service = match self.delegate.ready().await { - Ok(service) => service, - Err(MaybeBackPressureError::PermanentError(error)) => { - let e = Arc::new(error); - let err = e.clone(); - tokio::spawn(async move { - entry.insert(Err(err)).await; - }); - return Err(CacheResolverError::RetrievalError(e)); - } - Err(MaybeBackPressureError::TemporaryError(error)) => { - let err = error.clone(); - tokio::spawn(async move { - // Temporary errors are never cached - entry.send(Err(err)).await; - }); - return Err(CacheResolverError::Backpressure(error)); - } - }; + let planning_task = async move { + let service = match self.delegate.ready().await { + Ok(service) => service, + Err(MaybeBackPressureError::PermanentError(error)) => { + let e = Arc::new(error); + let err = e.clone(); + tokio::spawn(async move { + entry.insert(Err(err)).await; + }); + return Err(CacheResolverError::RetrievalError(e)); + } + Err(MaybeBackPressureError::TemporaryError(error)) => { + let err = error.clone(); + tokio::spawn(async move { + // Temporary errors are never cached + entry.send(Err(err)).await; + }); + return Err(CacheResolverError::Backpressure(error)); + } + }; - let res = service.call(request).await; + let res = service.call(request).await; - match res { - Ok(QueryPlannerResponse { content, errors }) => { - if let Some(content) = content.clone() { - let can_cache = match &content { - // Already cached in an introspection-specific, small-size, - // in-memory-only cache. - QueryPlannerContent::CachedIntrospectionResponse { .. } => { - false - } - _ => true, - }; - - if can_cache { - tokio::spawn(async move { - entry.insert(Ok(content)).await; - }); - } else { - tokio::spawn(async move { - entry.send(Ok(Ok(content))).await; - }); - } - } + match res { + Ok(QueryPlannerResponse { content, errors }) => { + if let Some(content) = content.clone() { + let can_cache = match &content { + // Already cached in an introspection-specific, small-size, + // in-memory-only cache. + QueryPlannerContent::CachedIntrospectionResponse { .. } => false, + _ => true, + }; - // This will be overridden by the Rust usage reporting implementation - if let Some(QueryPlannerContent::Plan { plan, .. }) = &content { - context.extensions().with_lock(|lock| { - lock.insert::>(plan.usage_reporting.clone()) + if can_cache { + tokio::spawn(async move { + entry.insert(Ok(content)).await; + }); + } else { + tokio::spawn(async move { + entry.send(Ok(Ok(content))).await; }); } - Ok(QueryPlannerResponse { content, errors }) } - Err(MaybeBackPressureError::PermanentError(error)) => { - let e = Arc::new(error); - let err = e.clone(); - tokio::spawn(async move { - entry.insert(Err(err)).await; + + // This will be overridden by the Rust usage reporting implementation + if let Some(QueryPlannerContent::Plan { plan, .. }) = &content { + context.extensions().with_lock(|lock| { + lock.insert::>(plan.usage_reporting.clone()) }); - if let Some(usage_reporting) = e.usage_reporting() { - context.extensions().with_lock(|lock| { - lock.insert::>(Arc::new(usage_reporting)); - }); - } - Err(CacheResolverError::RetrievalError(e)) } - Err(MaybeBackPressureError::TemporaryError(error)) => { - let err = error.clone(); - tokio::spawn(async move { - // Temporary errors are never cached - entry.send(Err(err)).await; + Ok(QueryPlannerResponse { content, errors }) + } + Err(MaybeBackPressureError::PermanentError(error)) => { + let e = Arc::new(error); + let err = e.clone(); + tokio::spawn(async move { + entry.insert(Err(err)).await; + }); + if let Some(usage_reporting) = e.usage_reporting() { + context.extensions().with_lock(|lock| { + lock.insert::>(Arc::new(usage_reporting)); }); - Err(CacheResolverError::Backpressure(error)) } + Err(CacheResolverError::RetrievalError(e)) + } + Err(MaybeBackPressureError::TemporaryError(error)) => { + let err = error.clone(); + tokio::spawn(async move { + // Temporary errors are never cached + entry.send(Err(err)).await; + }); + Err(CacheResolverError::Backpressure(error)) } } - .in_current_span(), - ) - .await - .map_err(|e| { + } + .in_current_span(); + + fn convert_join_error(e: impl std::fmt::Display) -> CacheResolverError { CacheResolverError::RetrievalError(Arc::new(QueryPlannerError::JoinError( e.to_string(), ))) - })? + } + + // When cooperative cancellation is enabled, we want to cancel the query planner + // task if the request is canceled. + if self.cooperative_cancellation.is_enabled() { + let outcome_recorded = Arc::new(AtomicU8::new(Outcome::None as u8)); + let planning_task = tokio::task::spawn(planning_task); + let outcome_recorded_for_abort = outcome_recorded.clone(); + let enforce_mode = self.cooperative_cancellation.is_enforce_mode(); + let measure_mode = self.cooperative_cancellation.is_measure_mode(); + let _abort_guard = + scopeguard::guard(planning_task.abort_handle(), move |abort_handle| { + // Client drop handler + // Only record outcome if not already recorded, and only if timeout hasn't already been recorded + if outcome_recorded_for_abort + .compare_exchange( + Outcome::None as u8, + Outcome::Cancelled as u8, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_ok() + { + if enforce_mode { + abort_handle.abort(); + } + tracing::Span::current().record(OUTCOME, CANCELLED); + } + }); + + match self.cooperative_cancellation.timeout() { + Some(timeout) => { + if enforce_mode { + fn convert_timeout_error( + e: impl std::fmt::Display, + ) -> CacheResolverError { + CacheResolverError::RetrievalError(Arc::new( + QueryPlannerError::Timeout(e.to_string()), + )) + } + + let outcome_recorded_for_timeout = outcome_recorded.clone(); + let planning_task_with_timeout = planning_task.timeout(timeout); + let res = planning_task_with_timeout.await; + // If timeout occurred, record outcome (if not already recorded) + if res.is_err() + && outcome_recorded_for_timeout + .compare_exchange( + Outcome::None as u8, + Outcome::Timeout as u8, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_ok() + { + tracing::Span::current().record(OUTCOME, TIMEOUT); + } + res.map_err(convert_timeout_error)? + } else if measure_mode { + // In measure mode, spawn a timeout task that only records outcome + let outcome_recorded_for_timeout = outcome_recorded.clone(); + let timeout_task = tokio::task::spawn(async move { + tokio::time::sleep(timeout).await; + if outcome_recorded_for_timeout + .compare_exchange( + Outcome::None as u8, + Outcome::Timeout as u8, + Ordering::SeqCst, + Ordering::SeqCst, + ) + .is_ok() + { + tracing::Span::current().record(OUTCOME, TIMEOUT); + } + }); + let _dropped_timeout_guard = + scopeguard::guard(timeout_task.abort_handle(), |abort_handle| { + abort_handle.abort(); + }); + planning_task.await + } else { + unreachable!( + "Can't set a timeout without enabling cooperative cancellation" + ); + } + } + None => planning_task.await, + } + } else { + // some clients might timeout and cancel the request before query planning is finished, + // so we execute it in a task that can continue even after the request was canceled and + // the join handle was dropped. That way, the next similar query will use the cache instead + // of restarting the query planner until another timeout + tokio::task::spawn(planning_task).await + } + .inspect(|res| { + // We won't reach this code path if the plan was cancelled, and + // thus it won't overwrite the outcome. + match res { + Ok(_) => { + tracing::Span::current().record(OUTCOME, SUCCESS); + } + Err(CacheResolverError::RetrievalError(e)) => { + if matches!(e.as_ref(), QueryPlannerError::Timeout(_)) { + tracing::Span::current().record(OUTCOME, TIMEOUT); + } else { + tracing::Span::current().record(OUTCOME, ERROR); + }; + } + Err(CacheResolverError::Backpressure(_)) => { + tracing::Span::current().record(OUTCOME, BACKPRESSURE); + } + Err(CacheResolverError::BatchingError(_)) => { + tracing::Span::current().record(OUTCOME, BATCHING_ERROR); + } + }; + }) + .map_err(convert_join_error)? } else { let res = entry.get().await.map_err(|e| match e { EntryError::IsFirst | // IsFirst should be unreachable @@ -719,22 +854,83 @@ impl ValueType for Result> { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::time::Duration; use mockall::mock; + use parking_lot::Mutex; use serde_json_bytes::json; use test_log::test; use tower::Service; + use tracing::Subscriber; + use tracing_core::Field; + use tracing_subscriber::Layer; + use tracing_subscriber::Registry; + use tracing_subscriber::layer::Context as TracingContext; + use tracing_subscriber::prelude::*; use super::*; use crate::Configuration; use crate::Context; use crate::apollo_studio_interop::UsageReporting; + use crate::configuration::QueryPlanning; + use crate::configuration::Supergraph; use crate::json_ext::Object; use crate::query_planner::QueryPlan; use crate::spec::Query; use crate::spec::Schema; + // Custom layer that records any field updates on spans. + #[derive(Default, Clone)] + struct RecordingLayer { + values: Arc>>, + } + + impl RecordingLayer { + fn get(&self, key: &str) -> Option { + self.values.lock().get(key).cloned() + } + } + + impl Layer for RecordingLayer + where + S: Subscriber, + { + fn on_record( + &self, + _span: &tracing::span::Id, + values: &tracing::span::Record<'_>, + _ctx: TracingContext<'_, S>, + ) { + let mut guard = self.values.lock(); + struct Visitor<'a> { + map: &'a mut HashMap, + } + + impl<'a> tracing_core::field::Visit for Visitor<'a> { + fn record_str(&mut self, field: &Field, value: &str) { + self.map.insert(field.name().to_string(), value.to_string()); + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + self.map + .insert(field.name().to_string(), format!("{:?}", value)); + } + } + + let mut visitor = Visitor { map: &mut guard }; + values.record(&mut visitor); + } + } + + // Helper function to set up tracing for tests + fn setup_tracing() -> (RecordingLayer, tracing::subscriber::DefaultGuard) { + let layer = RecordingLayer::default(); + let subscriber = Registry::default().with(layer.clone()); + let guard = tracing::subscriber::set_default(subscriber); + (layer, guard) + } + mock! { #[derive(Debug)] MyQueryPlanner { @@ -847,6 +1043,225 @@ mod tests { ); } + #[test(tokio::test)] + async fn test_cooperative_cancellation_timeout() { + let (layer, _guard) = setup_tracing(); + + #[derive(Clone)] + struct SlowQueryPlanner; + + impl Service for SlowQueryPlanner { + type Response = QueryPlannerResponse; + type Error = MaybeBackPressureError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut task::Context<'_>, + ) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: QueryPlannerRequest) -> Self::Future { + Box::pin(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + panic!("This query planner should not be called, as it is expected to timeout"); + }) + } + } + + let configuration = Configuration::builder() + .and_supergraph(Some( + Supergraph::builder() + .query_planning( + QueryPlanning::builder() + .experimental_cooperative_cancellation( + CooperativeCancellation::enabled_with_timeout( + std::time::Duration::from_secs(1), + ), + ) + .build(), + ) + .build(), + )) + .build() + .expect("configuration is valid"); + let schema = include_str!("testdata/schema.graphql"); + let schema = Arc::new(Schema::parse(schema, &configuration).unwrap()); + + let mut planner = CachingQueryPlanner::new( + SlowQueryPlanner, + schema.clone(), + Default::default(), + &configuration, + IndexMap::default(), + ) + .await + .unwrap(); + + let doc = Query::parse_document( + "query Me { me { name { first } } }", + None, + &schema, + &configuration, + ) + .unwrap(); + + let context = Context::new(); + context + .extensions() + .with_lock(|lock| lock.insert::(doc)); + + // Create a span with the outcome field declared + let span = tracing::info_span!("test_span", outcome = tracing::field::Empty); + // Keep the span alive and ensure it's the current span during the entire operation + let _span_guard = span.enter(); + + let result = planner + .call(query_planner::CachingRequest::new( + "query Me { me { name { first } } }".to_string(), + Some("".into()), + context.clone(), + )) + .await; + + match result { + Ok(_) => panic!("Expected an error, but got a response"), + Err(e) => { + assert!(matches!(e, CacheResolverError::RetrievalError(_))); + assert!(e.to_string().contains("timed out")); + } + } + + // Give a small delay to ensure the span is recorded + tokio::time::sleep(Duration::from_millis(10)).await; + + // Verify that the span recorded the timeout outcome + assert_eq!(layer.get("outcome"), Some("timeout".to_string())); + } + + #[test(tokio::test)] + async fn test_cooperative_cancellation_client_drop() { + use std::sync::Arc; + + use tokio::sync::Barrier; + + let (layer, _guard) = setup_tracing(); + let barrier = Arc::new(Barrier::new(2)); + let barrier_clone = barrier.clone(); + + #[derive(Clone)] + struct SlowQueryPlanner { + barrier: Arc, + } + + impl Service for SlowQueryPlanner { + type Response = QueryPlannerResponse; + type Error = MaybeBackPressureError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + _cx: &mut task::Context<'_>, + ) -> task::Poll> { + task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: QueryPlannerRequest) -> Self::Future { + let barrier = self.barrier.clone(); + Box::pin(async move { + // Signal that we've started + barrier.wait().await; + + // Now sleep for a long time - this should get cancelled + tokio::time::sleep(Duration::from_secs(10)).await; + panic!( + "This query planner should not complete, as it should be cancelled by client drop" + ); + }) + } + } + + let configuration = Configuration::builder() + .and_supergraph(Some( + Supergraph::builder() + .query_planning( + QueryPlanning::builder() + .experimental_cooperative_cancellation( + CooperativeCancellation::enabled(), + ) + .build(), + ) + .build(), + )) + .build() + .expect("configuration is valid"); + let schema = include_str!("testdata/schema.graphql"); + let schema = Arc::new(Schema::parse(schema, &configuration).unwrap()); + + let mut planner = CachingQueryPlanner::new( + SlowQueryPlanner { + barrier: barrier_clone, + }, + schema.clone(), + Default::default(), + &configuration, + IndexMap::default(), + ) + .await + .unwrap(); + + let doc = Query::parse_document( + "query Me { me { name { first } } }", + None, + &schema, + &configuration, + ) + .unwrap(); + + let context = Context::new(); + context + .extensions() + .with_lock(|lock| lock.insert::(doc)); + + // Create a span with the outcome field declared + let span = tracing::info_span!("test_span", outcome = tracing::field::Empty); + + // Keep the span alive and ensure it's the current span during the entire operation + let _span_guard = span.enter(); + + // Spawn the planning task + let planning_task = tokio::spawn(async move { + planner + .call(query_planner::CachingRequest::new( + "query Me { me { name { first } } }".to_string(), + Some("".into()), + context.clone(), + )) + .await + }); + + // Wait for the inner SlowQueryPlanner task to start + barrier.wait().await; + + // Now abort the outer task - the inner task should have definitely started + planning_task.abort(); + + // Verify the task was cancelled + match planning_task.await { + Ok(_) => panic!( + "Expected the task to be aborted due to client drop, but it completed successfully" + ), + Err(e) => assert!(e.is_cancelled(), "Task should be cancelled, got: {:?}", e), + } + + // Give a small delay to ensure the span is recorded + tokio::time::sleep(Duration::from_millis(10)).await; + + // Verify that the span recorded the cancelled outcome + assert_eq!(layer.get("outcome"), Some("cancelled".to_string())); + } + macro_rules! test_query_plan { () => { include_str!("testdata/query_plan.json") @@ -900,7 +1315,7 @@ mod tests { .await .unwrap(); - let context = Context::new(); + let context = crate::Context::new(); context .extensions() .with_lock(|lock| lock.insert::(doc)); diff --git a/apollo-router/src/query_planner/query_planner_service.rs b/apollo-router/src/query_planner/query_planner_service.rs index 4af76f5499..57729b7269 100644 --- a/apollo-router/src/query_planner/query_planner_service.rs +++ b/apollo-router/src/query_planner/query_planner_service.rs @@ -184,7 +184,16 @@ impl QueryPlannerService { let result = result.map_err(FederationErrorBridge::from); let elapsed = start.elapsed().as_secs_f64(); - metric_query_planning_plan_duration(RUST_QP_MODE, elapsed); + match &result { + Ok(_) => metric_query_planning_plan_duration(RUST_QP_MODE, elapsed, "success"), + Err(FederationErrorBridge::Cancellation(e)) if e.contains("timeout") => { + metric_query_planning_plan_duration(RUST_QP_MODE, elapsed, "timeout") + } + Err(FederationErrorBridge::Cancellation(_)) => { + metric_query_planning_plan_duration(RUST_QP_MODE, elapsed, "cancelled") + } + Err(_) => metric_query_planning_plan_duration(RUST_QP_MODE, elapsed, "error"), + } let plan = result?; let root_node = convert_root_query_plan_node(&plan); @@ -603,12 +612,17 @@ pub(crate) struct QueryPlanResult { pub(super) evaluated_plan_paths: u64, } -pub(crate) fn metric_query_planning_plan_duration(planner: &'static str, elapsed: f64) { +pub(crate) fn metric_query_planning_plan_duration( + planner: &'static str, + elapsed: f64, + outcome: &'static str, +) { f64_histogram!( "apollo.router.query_planning.plan.duration", "Duration of the query planning, in seconds.", elapsed, - "planner" = planner + "planner" = planner, + "outcome" = outcome ); } @@ -1205,11 +1219,12 @@ mod tests { fn test_metric_query_planning_plan_duration() { let start = Instant::now(); let elapsed = start.elapsed().as_secs_f64(); - metric_query_planning_plan_duration(RUST_QP_MODE, elapsed); + metric_query_planning_plan_duration(RUST_QP_MODE, elapsed, "success"); assert_histogram_exists!( "apollo.router.query_planning.plan.duration", f64, - "planner" = "rust" + "planner" = "rust", + "outcome" = "success" ); } diff --git a/apollo-router/src/query_planner/tests.rs b/apollo-router/src/query_planner/tests.rs index fab54b7237..1fce6fa816 100644 --- a/apollo-router/src/query_planner/tests.rs +++ b/apollo-router/src/query_planner/tests.rs @@ -78,7 +78,7 @@ fn service_usage() { /// The query planner reports the failed subgraph fetch as an error with a reason of "service /// closed", which is what this test expects. #[tokio::test] -async fn mock_subgraph_service_withf_panics_should_be_reported_as_service_closed() { +async fn mock_subgraph_service_with_panics_should_be_reported_as_service_closed() { let query_plan: QueryPlan = QueryPlan { root: serde_json::from_str(test_query_plan!()).unwrap(), formatted_query_plan: Default::default(), diff --git a/docs/source/routing/performance/caching/in-memory.mdx b/docs/source/routing/performance/caching/in-memory.mdx index b8ff6d6b0b..4c632f3cd0 100644 --- a/docs/source/routing/performance/caching/in-memory.mdx +++ b/docs/source/routing/performance/caching/in-memory.mdx @@ -72,6 +72,8 @@ To get more information on the planning and warm-up process use the following me * histograms: * `apollo.router.query_planning.plan.duration`: time spent planning queries + * `planner`: The query planner implementation used (`rust` or `js`) + * `outcome`: The outcome of the query planning process (`success`, `timeout`, `cancelled`, `error`) * `apollo.router.schema.load.duration`: time spent loading a schema * `apollo.router.cache.hit.time{kind="query planner", storage=""}`: time to get a value from the cache * `apollo.router.cache.miss.time{kind="query planner", storage=""}`