diff --git a/apollo-router/src/axum_factory/tests.rs b/apollo-router/src/axum_factory/tests.rs index 18f9aa7ae5..9f7e2582fb 100644 --- a/apollo-router/src/axum_factory/tests.rs +++ b/apollo-router/src/axum_factory/tests.rs @@ -2431,7 +2431,14 @@ async fn test_supergraph_timeout() { // we do the entire supergraph rebuilding instead of using `from_supergraph_mock_callback_and_configuration` // because we need the plugins to apply on the supergraph - let mut plugins = create_plugins(&conf, &schema, planner.subgraph_schemas(), None, None) + let subgraph_schemas = Arc::new( + planner + .subgraph_schemas() + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + ); + let mut plugins = create_plugins(&conf, &schema, subgraph_schemas, None, None) .await .unwrap(); diff --git a/apollo-router/src/metrics/mod.rs b/apollo-router/src/metrics/mod.rs index 03345356fa..99439193c7 100644 --- a/apollo-router/src/metrics/mod.rs +++ b/apollo-router/src/metrics/mod.rs @@ -149,7 +149,9 @@ pub(crate) mod test_utils { pub(crate) fn collect_metrics() -> Metrics { let mut metrics = Metrics::default(); let (_, reader) = meter_provider_and_readers(); - reader.collect(&mut metrics.resource_metrics).unwrap(); + reader + .collect(&mut metrics.resource_metrics) + .expect("Failed to collect metrics. Did you forget to use `async{}.with_metrics()`? See dev-docs/metrics.md"); metrics } @@ -926,6 +928,10 @@ macro_rules! assert_metric { }; } +/// Assert the value of a counter metric that has the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_counter { ($($name:ident).+, $value: expr, $($attr_key:literal = $attr_value:expr),+) => { @@ -965,6 +971,10 @@ macro_rules! assert_counter { }; } +/// Assert the value of a counter metric that has the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_up_down_counter { @@ -998,6 +1008,10 @@ macro_rules! assert_up_down_counter { }; } +/// Assert the value of a gauge metric that has the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_gauge { @@ -1064,6 +1078,10 @@ macro_rules! assert_histogram_count { }; } +/// Assert the sum value of a histogram metric with the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_histogram_sum { @@ -1097,6 +1115,10 @@ macro_rules! assert_histogram_sum { }; } +/// Assert that a histogram metric exists with the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_histogram_exists { @@ -1130,6 +1152,10 @@ macro_rules! assert_histogram_exists { }; } +/// Assert that a histogram metric does not exist with the given name and attributes. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] macro_rules! assert_histogram_not_exists { @@ -1163,6 +1189,13 @@ macro_rules! assert_histogram_not_exists { }; } +/// Assert that all metrics match an [insta] snapshot. +/// +/// Consider using [assert_non_zero_metrics_snapshot] to produce more grokkable snapshots if +/// zero-valued metrics are not relevant to your test. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] #[allow(unused_macros)] macro_rules! assert_metrics_snapshot { @@ -1181,6 +1214,10 @@ macro_rules! assert_metrics_snapshot { }; } +/// Assert that all metrics with a non-zero value match an [insta] snapshot. +/// +/// In asynchronous tests, you must use [`FutureMetricsExt::with_metrics`]. See dev-docs/metrics.md +/// for details: #[cfg(test)] #[allow(unused_macros)] macro_rules! assert_non_zero_metrics_snapshot { diff --git a/apollo-router/src/plugin/mod.rs b/apollo-router/src/plugin/mod.rs index 2b09408d63..dcf8c6c1ee 100644 --- a/apollo-router/src/plugin/mod.rs +++ b/apollo-router/src/plugin/mod.rs @@ -45,7 +45,6 @@ use tower::ServiceBuilder; use crate::graphql; use crate::layers::ServiceBuilderExt; use crate::notification::Notify; -use crate::query_planner::fetch::SubgraphSchemas; use crate::router_factory::Endpoint; use crate::services::execution; use crate::services::router; @@ -75,7 +74,7 @@ pub struct PluginInit { pub(crate) supergraph_schema: Arc>, /// The parsed subgraph schemas from the query planner, keyed by subgraph name - pub(crate) subgraph_schemas: Arc, + pub(crate) subgraph_schemas: Arc>>>, /// Launch ID pub(crate) launch_id: Option>, @@ -176,7 +175,7 @@ where supergraph_sdl: Arc, supergraph_schema_id: Arc, supergraph_schema: Arc>, - subgraph_schemas: Option>, + subgraph_schemas: Option>>>>, launch_id: Option>>, notify: Notify, ) -> Self { @@ -201,7 +200,7 @@ where supergraph_sdl: Arc, supergraph_schema_id: Arc, supergraph_schema: Arc>, - subgraph_schemas: Option>, + subgraph_schemas: Option>>>>, launch_id: Option>, notify: Notify, ) -> Result { @@ -224,7 +223,7 @@ where supergraph_sdl: Option>, supergraph_schema_id: Option>, supergraph_schema: Option>>, - subgraph_schemas: Option>, + subgraph_schemas: Option>>>>, launch_id: Option>, notify: Option>, ) -> Self { diff --git a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs index 7881b5abf8..686ade2eb6 100644 --- a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs +++ b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs @@ -742,7 +742,7 @@ mod tests { let mut demand_controlled_subgraph_schemas = HashMap::new(); for (subgraph_name, subgraph_schema) in planner.subgraph_schemas().iter() { let demand_controlled_subgraph_schema = - DemandControlledSchema::new(subgraph_schema.clone()).unwrap(); + DemandControlledSchema::new(subgraph_schema.schema.clone()).unwrap(); demand_controlled_subgraph_schemas .insert(subgraph_name.to_string(), demand_controlled_subgraph_schema); } diff --git a/apollo-router/src/plugins/include_subgraph_errors.rs b/apollo-router/src/plugins/include_subgraph_errors.rs index ecbfb5a45a..2b047adbbd 100644 --- a/apollo-router/src/plugins/include_subgraph_errors.rs +++ b/apollo-router/src/plugins/include_subgraph_errors.rs @@ -248,7 +248,13 @@ mod test { .await .unwrap(); let schema = planner.schema(); - let subgraph_schemas = planner.subgraph_schemas(); + let subgraph_schemas = Arc::new( + planner + .subgraph_schemas() + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + ); let builder = PluggableSupergraphServiceBuilder::new(planner); diff --git a/apollo-router/src/plugins/test.rs b/apollo-router/src/plugins/test.rs index 6120608f61..8b68888049 100644 --- a/apollo-router/src/plugins/test.rs +++ b/apollo-router/src/plugins/test.rs @@ -110,7 +110,12 @@ impl> + 'static> PluginTestHarness { .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into()) .supergraph_sdl(supergraph_sdl) .supergraph_schema(Arc::new(parsed_schema)) - .subgraph_schemas(subgraph_schemas) + .subgraph_schemas(Arc::new( + subgraph_schemas + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + )) .notify(Notify::default()) .build(); diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index a0ce94329f..695018587d 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -531,7 +531,13 @@ mod test { let planner = QueryPlannerService::new(schema.clone(), config.clone()) .await .unwrap(); - let subgraph_schemas = planner.subgraph_schemas(); + let subgraph_schemas = Arc::new( + planner + .subgraph_schemas() + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + ); let mut builder = PluggableSupergraphServiceBuilder::new(planner).with_configuration(config.clone()); diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index 58e01074df..90e88e04ab 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -1,10 +1,8 @@ -use std::collections::HashMap; use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; use std::task; -use apollo_compiler::validation::Valid; use futures::future::BoxFuture; use indexmap::IndexMap; use query_planner::QueryPlannerPlugin; @@ -61,7 +59,7 @@ pub(crate) struct CachingQueryPlanner { >, delegate: T, schema: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, plugins: Arc, enable_authorization_directives: bool, config_mode_hash: Arc, @@ -94,7 +92,7 @@ where pub(crate) async fn new( delegate: T, schema: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, configuration: &Configuration, plugins: Plugins, ) -> Result, BoxError> { @@ -339,9 +337,7 @@ where } impl CachingQueryPlanner { - pub(crate) fn subgraph_schemas( - &self, - ) -> Arc>>> { + pub(crate) fn subgraph_schemas(&self) -> Arc { self.delegate.subgraph_schemas() } diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index 0c975df1c1..84fb19bb95 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use apollo_compiler::validation::Valid; use futures::future::join_all; use futures::prelude::*; use tokio::sync::broadcast; @@ -25,6 +24,7 @@ use crate::json_ext::Value; use crate::json_ext::ValueExt; use crate::plugins::subscription::SubscriptionConfig; use crate::query_planner::fetch::FetchNode; +use crate::query_planner::fetch::SubgraphSchemas; use crate::query_planner::fetch::Variables; use crate::query_planner::FlattenNode; use crate::query_planner::Primary; @@ -56,7 +56,7 @@ impl QueryPlan { service_factory: &'a Arc, supergraph_request: &'a Arc>, schema: &'a Arc, - subgraph_schemas: &'a Arc>>>, + subgraph_schemas: &'a Arc, sender: mpsc::Sender, subscription_handle: Option, subscription_config: &'a Option, @@ -112,7 +112,7 @@ pub(crate) struct ExecutionParameters<'a> { pub(crate) context: &'a Context, pub(crate) service_factory: &'a Arc, pub(crate) schema: &'a Arc, - pub(crate) subgraph_schemas: &'a Arc>>>, + pub(crate) subgraph_schemas: &'a Arc, pub(crate) supergraph_request: &'a Arc>, pub(crate) deferred_fetches: &'a HashMap)>>, pub(crate) query: &'a Arc, diff --git a/apollo-router/src/query_planner/fetch.rs b/apollo-router/src/query_planner/fetch.rs index 9c710c46a5..849c98b505 100644 --- a/apollo-router/src/query_planner/fetch.rs +++ b/apollo-router/src/query_planner/fetch.rs @@ -1,10 +1,11 @@ -use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; use apollo_compiler::ast; +use apollo_compiler::collections::HashMap; use apollo_compiler::validation::Valid; use apollo_compiler::ExecutableDocument; +use apollo_compiler::Name; use indexmap::IndexSet; use serde::Deserialize; use serde::Serialize; @@ -95,7 +96,12 @@ impl From for OperationKind { } } -pub(crate) type SubgraphSchemas = HashMap>>; +pub(crate) type SubgraphSchemas = HashMap; + +pub(crate) struct SubgraphSchema { + pub(crate) schema: Arc>, + pub(crate) implementers_map: HashMap, +} /// A fetch node. #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] @@ -411,7 +417,7 @@ impl FetchNode { pub(crate) fn deferred_fetches( current_dir: &Path, id: &Option, - deferred_fetches: &HashMap)>>, + deferred_fetches: &std::collections::HashMap)>>, value: &Value, errors: &[Error], ) { @@ -575,7 +581,7 @@ impl FetchNode { subgraph_schemas: &SubgraphSchemas, ) -> Result<(), ValidationErrors> { let schema = &subgraph_schemas[self.service_name.as_ref()]; - self.operation.init_parsed(schema)?; + self.operation.init_parsed(&schema.schema)?; Ok(()) } @@ -585,11 +591,12 @@ impl FetchNode { supergraph_schema_hash: &str, ) -> Result<(), ValidationErrors> { let schema = &subgraph_schemas[self.service_name.as_ref()]; - let doc = self.operation.init_parsed(schema)?; + let doc = self.operation.init_parsed(&schema.schema)?; if let Ok(hash) = QueryHashVisitor::hash_query( - schema, + &schema.schema, supergraph_schema_hash, + &schema.implementers_map, doc, self.operation_name.as_deref(), ) { diff --git a/apollo-router/src/query_planner/query_planner_service.rs b/apollo-router/src/query_planner/query_planner_service.rs index 1daa878aed..0b22fde199 100644 --- a/apollo-router/src/query_planner/query_planner_service.rs +++ b/apollo-router/src/query_planner/query_planner_service.rs @@ -1,6 +1,5 @@ //! Calls out to the apollo-federation crate -use std::collections::HashMap; use std::fmt::Debug; use std::ops::ControlFlow; use std::sync::Arc; @@ -9,7 +8,6 @@ use std::task::Poll; use std::time::Instant; use apollo_compiler::ast; -use apollo_compiler::validation::Valid; use apollo_compiler::Name; use apollo_federation::error::FederationError; use apollo_federation::error::SingleFederationError; @@ -42,6 +40,8 @@ use crate::plugins::telemetry::config::ApolloSignatureNormalizationAlgorithm; use crate::plugins::telemetry::config::Conf as TelemetryConfig; use crate::query_planner::convert::convert_root_query_plan_node; use crate::query_planner::fetch::QueryHash; +use crate::query_planner::fetch::SubgraphSchema; +use crate::query_planner::fetch::SubgraphSchemas; use crate::query_planner::labeler::add_defer_labels; use crate::services::layers::query_analysis::ParsedDocument; use crate::services::layers::query_analysis::ParsedDocumentInner; @@ -67,7 +67,7 @@ const INTERNAL_INIT_ERROR: &str = "internal"; pub(crate) struct QueryPlannerService { planner: Arc, schema: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, configuration: Arc, enable_authorization_directives: bool, _federation_instrument: ObservableGauge, @@ -191,7 +191,15 @@ impl QueryPlannerService { planner .subgraph_schemas() .iter() - .map(|(name, schema)| (name.to_string(), Arc::new(schema.schema().clone()))) + .map(|(name, schema)| { + ( + name.to_string(), + SubgraphSchema { + implementers_map: schema.schema().implementers_map(), + schema: Arc::new(schema.schema().clone()), + }, + ) + }) .collect(), ); @@ -218,9 +226,7 @@ impl QueryPlannerService { self.schema.clone() } - pub(crate) fn subgraph_schemas( - &self, - ) -> Arc>>> { + pub(crate) fn subgraph_schemas(&self) -> Arc { self.subgraph_schemas.clone() } @@ -383,6 +389,7 @@ impl Service for QueryPlannerService { let hash = QueryHashVisitor::hash_query( this.schema.supergraph_schema(), &this.schema.raw_sdl, + &this.schema.implementers_map, &executable_document, operation_name.as_deref(), ) @@ -508,6 +515,7 @@ impl QueryPlannerService { let hash = QueryHashVisitor::hash_query( self.schema.supergraph_schema(), &self.schema.raw_sdl, + &self.schema.implementers_map, &executable_document, key.operation_name.as_deref(), ) @@ -595,6 +603,8 @@ pub(crate) fn metric_rust_qp_init(init_error_kind: Option<&'static str>) { #[cfg(test)] mod tests { + use std::collections::HashMap; + use test_log::test; use tower::ServiceExt; diff --git a/apollo-router/src/query_planner/subscription.rs b/apollo-router/src/query_planner/subscription.rs index 1b8038afd6..32b2aa8c2b 100644 --- a/apollo-router/src/query_planner/subscription.rs +++ b/apollo-router/src/query_planner/subscription.rs @@ -70,7 +70,7 @@ impl SubscriptionNode { subgraph_schemas: &SubgraphSchemas, ) -> Result<(), ValidationErrors> { let schema = &subgraph_schemas[self.service_name.as_ref()]; - self.operation.init_parsed(schema)?; + self.operation.init_parsed(&schema.schema)?; Ok(()) } } diff --git a/apollo-router/src/query_planner/tests.rs b/apollo-router/src/query_planner/tests.rs index e69cbf90de..4494f442bf 100644 --- a/apollo-router/src/query_planner/tests.rs +++ b/apollo-router/src/query_planner/tests.rs @@ -1882,8 +1882,14 @@ fn broken_plan_does_not_panic() { estimated_size: Default::default(), }; let subgraph_schema = apollo_compiler::Schema::parse_and_validate(subgraph_schema, "").unwrap(); - let mut subgraph_schemas = HashMap::new(); - subgraph_schemas.insert("X".to_owned(), Arc::new(subgraph_schema)); + let mut subgraph_schemas = HashMap::default(); + subgraph_schemas.insert( + "X".to_owned(), + query_planner::fetch::SubgraphSchema { + implementers_map: subgraph_schema.implementers_map(), + schema: Arc::new(subgraph_schema), + }, + ); let result = Arc::make_mut(&mut plan.root) .init_parsed_operations_and_hash_subqueries(&subgraph_schemas, ""); assert_eq!( diff --git a/apollo-router/src/router_factory.rs b/apollo-router/src/router_factory.rs index 048b65211e..b3cfebdba7 100644 --- a/apollo-router/src/router_factory.rs +++ b/apollo-router/src/router_factory.rs @@ -311,11 +311,19 @@ impl YamlRouterFactory { let span = tracing::info_span!("plugins"); // Process the plugins. + let subgraph_schemas = Arc::new( + planner + .subgraph_schemas() + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + ); + let plugins: Arc = Arc::new( create_plugins( &configuration, &schema, - planner.subgraph_schemas(), + subgraph_schemas, initial_telemetry_plugin, extra_plugins, ) diff --git a/apollo-router/src/services/connector_service.rs b/apollo-router/src/services/connector_service.rs index 10cf5d8b8e..4ad2e19d33 100644 --- a/apollo-router/src/services/connector_service.rs +++ b/apollo-router/src/services/connector_service.rs @@ -1,11 +1,9 @@ //! Tower service for connectors. -use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::task::Poll; -use apollo_compiler::validation::Valid; use apollo_federation::sources::connect::Connector; use futures::future::BoxFuture; use indexmap::IndexMap; @@ -37,6 +35,7 @@ use crate::plugins::connectors::tracing::connect_spec_version_instrument; use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; use crate::plugins::subscription::SubscriptionConfig; use crate::plugins::telemetry::consts::CONNECT_SPAN_NAME; +use crate::query_planner::fetch::SubgraphSchemas; use crate::services::router::body::RouterBody; use crate::services::ConnectRequest; use crate::services::ConnectResponse; @@ -63,7 +62,7 @@ pub(crate) const CONNECTOR_INFO_CONTEXT_KEY: &str = "apollo_router::connector::i pub(crate) struct ConnectorService { pub(crate) http_service_factory: Arc>, pub(crate) _schema: Arc, - pub(crate) _subgraph_schemas: Arc>>>, + pub(crate) _subgraph_schemas: Arc, pub(crate) _subscription_config: Option, pub(crate) connectors_by_service_name: Arc, Connector>>, } @@ -312,7 +311,7 @@ fn handle_subrequest_http_error(err: BoxError, connector: &Connector) -> BoxErro #[derive(Clone)] pub(crate) struct ConnectorServiceFactory { pub(crate) schema: Arc, - pub(crate) subgraph_schemas: Arc>>>, + pub(crate) subgraph_schemas: Arc, pub(crate) http_service_factory: Arc>, pub(crate) subscription_config: Option, pub(crate) connectors_by_service_name: Arc, Connector>>, @@ -322,7 +321,7 @@ pub(crate) struct ConnectorServiceFactory { impl ConnectorServiceFactory { pub(crate) fn new( schema: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, http_service_factory: Arc>, subscription_config: Option, connectors_by_service_name: Arc, Connector>>, diff --git a/apollo-router/src/services/execution/service.rs b/apollo-router/src/services/execution/service.rs index 76cf8c1b80..b45ad5519f 100644 --- a/apollo-router/src/services/execution/service.rs +++ b/apollo-router/src/services/execution/service.rs @@ -1,6 +1,5 @@ //! Implements the Execution phase of the request lifecycle. -use std::collections::HashMap; use std::future::ready; use std::pin::Pin; use std::sync::Arc; @@ -9,7 +8,6 @@ use std::task::Poll; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use apollo_compiler::validation::Valid; use futures::future::BoxFuture; use futures::stream::once; use futures::Stream; @@ -47,6 +45,7 @@ use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN; use crate::plugins::telemetry::apollo::Config as ApolloTelemetryConfig; use crate::plugins::telemetry::config::ApolloMetricsReferenceMode; use crate::plugins::telemetry::Telemetry; +use crate::query_planner::fetch::SubgraphSchemas; use crate::query_planner::subscription::SubscriptionHandle; use crate::services::execution; use crate::services::fetch_service::FetchServiceFactory; @@ -62,7 +61,7 @@ use crate::spec::Schema; #[derive(Clone)] pub(crate) struct ExecutionService { pub(crate) schema: Arc, - pub(crate) subgraph_schemas: Arc>>>, + pub(crate) subgraph_schemas: Arc, pub(crate) fetch_service_factory: Arc, /// Subscription config if enabled subscription_config: Option, @@ -632,7 +631,7 @@ async fn consume_responses( #[derive(Clone)] pub(crate) struct ExecutionServiceFactory { pub(crate) schema: Arc, - pub(crate) subgraph_schemas: Arc>>>, + pub(crate) subgraph_schemas: Arc, pub(crate) plugins: Arc, pub(crate) fetch_service_factory: Arc, } diff --git a/apollo-router/src/services/fetch_service.rs b/apollo-router/src/services/fetch_service.rs index 32562f9d93..6a852cf22e 100644 --- a/apollo-router/src/services/fetch_service.rs +++ b/apollo-router/src/services/fetch_service.rs @@ -1,11 +1,9 @@ //! Tower fetcher for fetch node execution. -use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Poll; -use apollo_compiler::validation::Valid; use futures::future::BoxFuture; use serde_json_bytes::Value; use tokio::sync::mpsc; @@ -29,6 +27,7 @@ use crate::http_ext; use crate::plugins::subscription::SubscriptionConfig; use crate::query_planner::build_operation_with_aliasing; use crate::query_planner::fetch::FetchNode; +use crate::query_planner::fetch::SubgraphSchemas; use crate::query_planner::subscription::SubscriptionNode; use crate::query_planner::subscription::OPENED_SUBSCRIPTIONS; use crate::query_planner::OperationKind; @@ -47,7 +46,7 @@ use crate::spec::Schema; pub(crate) struct FetchService { pub(crate) subgraph_service_factory: Arc, pub(crate) schema: Arc, - pub(crate) subgraph_schemas: Arc>>>, + pub(crate) subgraph_schemas: Arc, pub(crate) _subscription_config: Option, // TODO: add subscription support to FetchService pub(crate) connector_service_factory: Arc, } @@ -166,7 +165,7 @@ impl FetchService { fn fetch_with_subgraph_service( schema: Arc, subgraph_service_factory: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, request: FetchRequest, ) -> BoxFuture<'static, Result> { let FetchRequest { @@ -195,7 +194,7 @@ impl FetchService { let alias_query_string; // this exists outside the if block to allow the as_str() to be longer lived let aliased_operation = if let Some(ctx_arg) = &variables.contextual_arguments { if let Some(subgraph_schema) = subgraph_schemas.get(&service_name.to_string()) { - match build_operation_with_aliasing(operation, ctx_arg, subgraph_schema) { + match build_operation_with_aliasing(operation, ctx_arg, &subgraph_schema.schema) { Ok(op) => { alias_query_string = op.serialize().no_indent().to_string(); alias_query_string.as_str() @@ -456,7 +455,7 @@ impl FetchService { #[derive(Clone)] pub(crate) struct FetchServiceFactory { pub(crate) schema: Arc, - pub(crate) subgraph_schemas: Arc>>>, + pub(crate) subgraph_schemas: Arc, pub(crate) subgraph_service_factory: Arc, pub(crate) subscription_config: Option, pub(crate) connector_service_factory: Arc, @@ -465,7 +464,7 @@ pub(crate) struct FetchServiceFactory { impl FetchServiceFactory { pub(crate) fn new( schema: Arc, - subgraph_schemas: Arc>>>, + subgraph_schemas: Arc, subgraph_service_factory: Arc, subscription_config: Option, connector_service_factory: Arc, diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index e947b5eb39..2c01813446 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -559,7 +559,14 @@ async fn subscription_task( // If the configuration was dropped in the meantime, we ignore this update and will // pick up the next one. if let Some(conf) = new_configuration.upgrade() { - let plugins = match create_plugins(&conf, &execution_service_factory.schema, execution_service_factory.subgraph_schemas.clone(), None, None).await { + let subgraph_schemas = Arc::new( + execution_service_factory + .subgraph_schemas + .iter() + .map(|(k, v)| (k.clone(), v.schema.clone())) + .collect(), + ); + let plugins = match create_plugins(&conf, &execution_service_factory.schema, subgraph_schemas, None, None).await { Ok(plugins) => Arc::new(plugins), Err(err) => { tracing::error!("cannot re-create plugins with the new configuration (closing existing subscription): {err:?}"); diff --git a/apollo-router/src/spec/query.rs b/apollo-router/src/spec/query.rs index 4f83d7c8e3..6ddf3ac1df 100644 --- a/apollo-router/src/spec/query.rs +++ b/apollo-router/src/spec/query.rs @@ -269,6 +269,7 @@ impl Query { let hash = QueryHashVisitor::hash_query( schema.supergraph_schema(), &schema.raw_sdl, + &schema.implementers_map, &executable_document, operation_name, ) @@ -323,10 +324,13 @@ impl Query { let operation = get_operation(document, operation_name)?; let operation = Operation::from_hir(&operation, schema, &mut defer_stats, &fragments)?; - let mut visitor = - QueryHashVisitor::new(schema.supergraph_schema(), &schema.raw_sdl, document).map_err( - |e| SpecError::QueryHashing(format!("could not calculate the query hash: {e}")), - )?; + let mut visitor = QueryHashVisitor::new( + schema.supergraph_schema(), + &schema.raw_sdl, + &schema.implementers_map, + document, + ) + .map_err(|e| SpecError::QueryHashing(format!("could not calculate the query hash: {e}")))?; traverse::document(&mut visitor, document, operation_name).map_err(|e| { SpecError::QueryHashing(format!("could not calculate the query hash: {e}")) })?; diff --git a/apollo-router/src/spec/query/change.rs b/apollo-router/src/spec/query/change.rs index 79e6369499..e8da0caa39 100644 --- a/apollo-router/src/spec/query/change.rs +++ b/apollo-router/src/spec/query/change.rs @@ -39,13 +39,13 @@ //! //! This prevents possible collision while hashing multiple things in a sequence. The //! `^` character cannot be present in GraphQL names, so this is a good separator. -use std::collections::HashMap; -use std::collections::HashSet; use std::hash::Hash; use std::hash::Hasher; use apollo_compiler::ast; use apollo_compiler::ast::FieldDefinition; +use apollo_compiler::collections::HashMap; +use apollo_compiler::collections::HashSet; use apollo_compiler::executable; use apollo_compiler::parser::Parser; use apollo_compiler::schema; @@ -79,6 +79,7 @@ pub(crate) struct QueryHashVisitor<'a> { // For now, introspection is still handled by the planner, so when an // introspection query is hashed, it should take the whole schema into account schema_str: &'a str, + implementers_map: &'a HashMap, hasher: Sha256, fragments: HashMap<&'a Name, &'a Node>, hashed_types: HashSet, @@ -95,15 +96,17 @@ impl<'a> QueryHashVisitor<'a> { pub(crate) fn new( schema: &'a schema::Schema, schema_str: &'a str, + implementers_map: &'a HashMap, executable: &'a executable::ExecutableDocument, ) -> Result { let mut visitor = Self { schema, schema_str, + implementers_map, hasher: Sha256::new(), fragments: executable.fragments.iter().collect(), - hashed_types: HashSet::new(), - hashed_field_definitions: HashSet::new(), + hashed_types: HashSet::default(), + hashed_field_definitions: HashSet::default(), seen_introspection: false, // should we just return an error if we do not find those directives? join_field_directive_name: Schema::directive_name( @@ -124,7 +127,7 @@ impl<'a> QueryHashVisitor<'a> { ">=0.1.0", CONTEXT_DIRECTIVE_NAME, ), - contexts: HashMap::new(), + contexts: HashMap::default(), }; visitor.hash_schema()?; @@ -147,10 +150,11 @@ impl<'a> QueryHashVisitor<'a> { pub(crate) fn hash_query( schema: &'a schema::Schema, schema_str: &'a str, + implementers_map: &'a HashMap, executable: &'a executable::ExecutableDocument, operation_name: Option<&str>, ) -> Result, BoxError> { - let mut visitor = QueryHashVisitor::new(schema, schema_str, executable)?; + let mut visitor = QueryHashVisitor::new(schema, schema_str, implementers_map, executable)?; traverse::document(&mut visitor, executable, operation_name)?; // hash the entire query string to prevent collisions executable.to_string().hash(&mut visitor); @@ -326,7 +330,7 @@ impl<'a> QueryHashVisitor<'a> { } "^IMPLEMENTED_INTERFACES_LIST_END".hash(self); - if let Some(implementers) = self.schema().implementers_map().get(&i.name) { + if let Some(implementers) = self.implementers_map.get(&i.name) { "^IMPLEMENTER_OBJECT_LIST".hash(self); for object in &implementers.objects { @@ -651,7 +655,7 @@ impl<'a> QueryHashVisitor<'a> { ) -> Result<(), BoxError> { "^INTERFACE_IMPL".hash(self); - if let Some(implementers) = self.schema.implementers_map().get(&intf.name) { + if let Some(implementers) = self.implementers_map.get(&intf.name) { "^IMPLEMENTER_LIST".hash(self); for object in &implementers.objects { self.hash_type_by_name(object)?; @@ -840,12 +844,17 @@ mod tests { .unwrap() .validate(&schema) .unwrap(); - let mut visitor = QueryHashVisitor::new(&schema, schema_str, &exec).unwrap(); + let implementers_map = schema.implementers_map(); + let mut visitor = + QueryHashVisitor::new(&schema, schema_str, &implementers_map, &exec).unwrap(); traverse::document(&mut visitor, &exec, None).unwrap(); ( hex::encode(visitor.finish()), - hex::encode(QueryHashVisitor::hash_query(&schema, schema_str, &exec, None).unwrap()), + hex::encode( + QueryHashVisitor::hash_query(&schema, schema_str, &implementers_map, &exec, None) + .unwrap(), + ), ) .into() } @@ -859,7 +868,9 @@ mod tests { .unwrap() .validate(&schema) .unwrap(); - let mut visitor = QueryHashVisitor::new(&schema, schema_str, &exec).unwrap(); + let implementers_map = schema.implementers_map(); + let mut visitor = + QueryHashVisitor::new(&schema, schema_str, &implementers_map, &exec).unwrap(); traverse::document(&mut visitor, &exec, None).unwrap(); hex::encode(visitor.finish()) diff --git a/docs/source/routing/performance/caching/entity.mdx b/docs/source/routing/performance/caching/entity.mdx index 64618ca2d5..bf2f7e6826 100644 --- a/docs/source/routing/performance/caching/entity.mdx +++ b/docs/source/routing/performance/caching/entity.mdx @@ -216,16 +216,15 @@ This entry contains an object with the `all` field to affect all subgraph reques You can invalidate entity cache entries with a [specifically formatted request](#invalidation-request-format once you [configure your router](#configuration) appropriately. For example, if price data changes before a price entity's TTL expires, you can send an invalidation request. ```mermaid - flowchart RL subgraph QueryResponse["Cache invalidation POST"] n1["{ -   "kind": "subgraph", -   "subgraph": "price", -   "type": "Price", -   "key": { -     "id": "101" -   } +     "kind": "subgraph", +     "subgraph": "price", +     "type": "Price", +     "key": { +         "id": "101" +     } }"] end @@ -236,18 +235,18 @@ flowchart RL end subgraph PriceQueryFragment["Price Query Fragment (e.g. TTL 2200)"] - n2[" ̶{̶ -   " ̶p̶r̶i̶c̶e̶": ̶{̶ -     " ̶i̶d̶": ̶1̶0̶1̶, -     " ̶p̶r̶o̶d̶u̶c̶t̶_̶i̶d̶": ̶1̶2̶, -     " ̶a̶m̶o̶u̶n̶t̶": ̶1̶5̶0̶0̶, -     "̶c̶u̶r̶r̶e̶n̶c̶y̶_̶c̶o̶d̶e̶": " ̶U̶S̶D̶" -    ̶}̶ - ̶}̶"] + n2["{ +     "price": { +         "id": 101, +         "product_id": 12, +         "amount": 1500, +         "currency_code": "USD" +     } + }"] end Router - Database[("   ")] + Database[("    ")] QueryResponse --> Router Purchases --> Router diff --git a/docs/source/routing/performance/caching/in-memory.mdx b/docs/source/routing/performance/caching/in-memory.mdx index fbadf23a4d..159539e6b6 100644 --- a/docs/source/routing/performance/caching/in-memory.mdx +++ b/docs/source/routing/performance/caching/in-memory.mdx @@ -40,7 +40,7 @@ supergraph: ### Cache warm-up -When loading a new schema, a query plan might change for some queries, so cached query plans cannot be reused. +When loading a new schema, a query plan might change for some queries, so cached query plans cannot be reused. To prevent increased latency upon query plan cache invalidation, the router precomputes query plans for the most used queries from the cache when a new schema is loaded. @@ -80,19 +80,6 @@ then look at `apollo_router_schema_loading_time` and `apollo.router.query_planni If the router is using distributed caching for query plans, the warm-up phase will also store the new query plans in Redis. Since all Router instances might have the same distributions of queries in their in-memory cache, the list of queries is shuffled before warm-up, so each Router instance can plan queries in a different order and share their results through the cache. -#### Schema aware query hashing - -The query plan cache key uses a hashing algorithm specifically designed for GraphQL queries, using the schema. If a schema update does not affect a query (example: a field was added), then the query hash will stay the same. The query plan cache can use that key during warm up to check if a cached entry can be reused instead of planning it again. - -It can be activated through this option: - -```yaml title="router.yaml" -supergraph: - query_planning: - warmed_up_queries: 100 - experimental_reuse_query_plans: true -``` - ## Caching automatic persisted queries (APQ) [Automatic Persisted Queries (**APQ**)](/apollo-server/performance/apq/) enable GraphQL clients to send a server the _hash_ of their query string, _instead of_ sending the query string itself. When query strings are very large, this can significantly reduce network usage.