Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion apollo-router/src/axum_factory/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2278,7 +2278,14 @@ async fn test_supergraph_timeout() {

// we do the entire supergraph rebuilding instead of using `from_supergraph_mock_callback_and_configuration`
// because we need the plugins to apply on the supergraph
let mut plugins = create_plugins(&conf, &schema, planner.subgraph_schemas(), None, None)
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);
let mut plugins = create_plugins(&conf, &schema, subgraph_schemas, None, None)
.await
.unwrap();

Expand Down
9 changes: 4 additions & 5 deletions apollo-router/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use tower::ServiceBuilder;
use crate::graphql;
use crate::layers::ServiceBuilderExt;
use crate::notification::Notify;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::router_factory::Endpoint;
use crate::services::execution;
use crate::services::router;
Expand Down Expand Up @@ -75,7 +74,7 @@ pub struct PluginInit<T> {
pub(crate) supergraph_schema: Arc<Valid<Schema>>,

/// The parsed subgraph schemas from the query planner, keyed by subgraph name
pub(crate) subgraph_schemas: Arc<SubgraphSchemas>,
pub(crate) subgraph_schemas: Arc<HashMap<String, Arc<Valid<Schema>>>>,

/// Launch ID
pub(crate) launch_id: Option<Arc<String>>,
Expand Down Expand Up @@ -176,7 +175,7 @@ where
supergraph_sdl: Arc<String>,
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Option<Arc<String>>>,
notify: Notify<String, graphql::Response>,
) -> Self {
Expand All @@ -201,7 +200,7 @@ where
supergraph_sdl: Arc<String>,
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Arc<String>>,
notify: Notify<String, graphql::Response>,
) -> Result<Self, BoxError> {
Expand All @@ -224,7 +223,7 @@ where
supergraph_sdl: Option<Arc<String>>,
supergraph_schema_id: Option<Arc<String>>,
supergraph_schema: Option<Arc<Valid<Schema>>>,
subgraph_schemas: Option<Arc<SubgraphSchemas>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Arc<String>>,
notify: Option<Notify<String, graphql::Response>>,
) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ mod tests {
let mut demand_controlled_subgraph_schemas = HashMap::new();
for (subgraph_name, subgraph_schema) in planner.subgraph_schemas().iter() {
let demand_controlled_subgraph_schema =
DemandControlledSchema::new(subgraph_schema.clone()).unwrap();
DemandControlledSchema::new(subgraph_schema.schema.clone()).unwrap();
demand_controlled_subgraph_schemas
.insert(subgraph_name.to_string(), demand_controlled_subgraph_schema);
}
Expand Down
8 changes: 7 additions & 1 deletion apollo-router/src/plugins/include_subgraph_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ mod test {
.await
.unwrap();
let schema = planner.schema();
let subgraph_schemas = planner.subgraph_schemas();
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);

let builder = PluggableSupergraphServiceBuilder::new(planner);

Expand Down
7 changes: 6 additions & 1 deletion apollo-router/src/plugins/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ impl<T: Into<Box<dyn DynPlugin + 'static>> + 'static> PluginTestHarness<T> {
.supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into())
.supergraph_sdl(supergraph_sdl)
.supergraph_schema(Arc::new(parsed_schema))
.subgraph_schemas(subgraph_schemas)
.subgraph_schemas(Arc::new(
subgraph_schemas
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
))
.notify(Notify::default())
.build();

Expand Down
8 changes: 7 additions & 1 deletion apollo-router/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,13 @@ mod test {
)
.await
.unwrap();
let subgraph_schemas = planner.subgraph_schemas();
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);

let mut builder =
PluggableSupergraphServiceBuilder::new(planner).with_configuration(config.clone());
Expand Down
36 changes: 25 additions & 11 deletions apollo-router/src/query_planner/bridge_query_planner.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
//! Calls out to nodejs query planner

use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Instant;

use apollo_compiler::ast;
use apollo_compiler::validation::Valid;
use apollo_compiler::Name;
use apollo_federation::error::FederationError;
use apollo_federation::error::SingleFederationError;
Expand Down Expand Up @@ -49,6 +47,8 @@ use crate::plugins::telemetry::config::Conf as TelemetryConfig;
use crate::query_planner::convert::convert_root_query_plan_node;
use crate::query_planner::dual_query_planner::BothModeComparisonJob;
use crate::query_planner::fetch::QueryHash;
use crate::query_planner::fetch::SubgraphSchema;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::query_planner::labeler::add_defer_labels;
use crate::services::layers::query_analysis::ParsedDocument;
use crate::services::layers::query_analysis::ParsedDocumentInner;
Expand All @@ -74,7 +74,7 @@ const INTERNAL_INIT_ERROR: &str = "internal";
pub(crate) struct BridgeQueryPlanner {
planner: PlannerMode,
schema: Arc<Schema>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
subgraph_schemas: Arc<SubgraphSchemas>,
configuration: Arc<Configuration>,
enable_authorization_directives: bool,
_federation_instrument: ObservableGauge<u64>,
Expand Down Expand Up @@ -360,17 +360,23 @@ impl PlannerMode {
}
}

async fn subgraphs(
&self,
) -> Result<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>, ServiceBuildError> {
async fn subgraphs(&self) -> Result<SubgraphSchemas, ServiceBuildError> {
let js = match self {
PlannerMode::Js(js) => js,
PlannerMode::Both { js, .. } => js,
PlannerMode::Rust(rust) => {
return Ok(rust
.subgraph_schemas()
.iter()
.map(|(name, schema)| (name.to_string(), Arc::new(schema.schema().clone())))
.map(|(name, schema)| {
(
name.to_string(),
SubgraphSchema {
implementers_map: schema.schema().implementers_map(),
schema: Arc::new(schema.schema().clone()),
},
)
})
.collect())
}
};
Expand All @@ -380,7 +386,13 @@ impl PlannerMode {
.map(|(name, schema_str)| {
let schema = apollo_compiler::Schema::parse_and_validate(schema_str, "")
.map_err(|errors| SchemaError::Validate(errors.into()))?;
Ok((name, Arc::new(schema)))
Ok((
name,
SubgraphSchema {
implementers_map: schema.implementers_map(),
schema: Arc::new(schema),
},
))
})
.collect()
}
Expand Down Expand Up @@ -430,9 +442,7 @@ impl BridgeQueryPlanner {
self.schema.clone()
}

pub(crate) fn subgraph_schemas(
&self,
) -> Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>> {
pub(crate) fn subgraph_schemas(&self) -> Arc<SubgraphSchemas> {
self.subgraph_schemas.clone()
}

Expand Down Expand Up @@ -619,6 +629,7 @@ impl Service<QueryPlannerRequest> for BridgeQueryPlanner {
let hash = QueryHashVisitor::hash_query(
this.schema.supergraph_schema(),
&this.schema.raw_sdl,
&this.schema.implementers_map,
&executable_document,
operation_name.as_deref(),
)
Expand Down Expand Up @@ -738,6 +749,7 @@ impl BridgeQueryPlanner {
let hash = QueryHashVisitor::hash_query(
self.schema.supergraph_schema(),
&self.schema.raw_sdl,
&self.schema.implementers_map,
&executable_document,
key.operation_name.as_deref(),
)
Expand Down Expand Up @@ -832,6 +844,8 @@ pub(crate) fn metric_rust_qp_init(init_error_kind: Option<&'static str>) {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use serde_json::json;
use test_log::test;
use tower::Service;
Expand Down
9 changes: 3 additions & 6 deletions apollo-router/src/query_planner/bridge_query_planner_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
Expand All @@ -7,7 +6,6 @@ use std::sync::Mutex;
use std::task::Poll;
use std::time::Instant;

use apollo_compiler::validation::Valid;
use async_channel::bounded;
use async_channel::Sender;
use futures::future::BoxFuture;
Expand All @@ -27,6 +25,7 @@ use crate::error::QueryPlannerError;
use crate::error::ServiceBuildError;
use crate::introspection::IntrospectionCache;
use crate::metrics::meter_provider;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::query_planner::PlannerMode;
use crate::services::QueryPlannerRequest;
use crate::services::QueryPlannerResponse;
Expand All @@ -40,7 +39,7 @@ pub(crate) struct BridgeQueryPlannerPool {
js_planners: Vec<Arc<Planner<QueryPlanResult>>>,
pool_mode: PoolMode,
schema: Arc<Schema>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
subgraph_schemas: Arc<SubgraphSchemas>,
compute_jobs_queue_size_gauge: Arc<Mutex<Option<ObservableGauge<u64>>>>,
v8_heap_used: Arc<AtomicU64>,
v8_heap_used_gauge: Arc<Mutex<Option<ObservableGauge<u64>>>>,
Expand Down Expand Up @@ -248,9 +247,7 @@ impl BridgeQueryPlannerPool {
self.schema.clone()
}

pub(crate) fn subgraph_schemas(
&self,
) -> Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>> {
pub(crate) fn subgraph_schemas(&self) -> Arc<SubgraphSchemas> {
self.subgraph_schemas.clone()
}

Expand Down
10 changes: 3 additions & 7 deletions apollo-router/src/query_planner/caching_query_planner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;
use std::task;

use apollo_compiler::validation::Valid;
use futures::future::BoxFuture;
use indexmap::IndexMap;
use query_planner::QueryPlannerPlugin;
Expand Down Expand Up @@ -72,7 +70,7 @@ pub(crate) struct CachingQueryPlanner<T: Clone> {
>,
delegate: T,
schema: Arc<Schema>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
subgraph_schemas: Arc<SubgraphSchemas>,
plugins: Arc<Plugins>,
enable_authorization_directives: bool,
config_mode_hash: Arc<QueryHash>,
Expand Down Expand Up @@ -105,7 +103,7 @@ where
pub(crate) async fn new(
delegate: T,
schema: Arc<Schema>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
subgraph_schemas: Arc<SubgraphSchemas>,
configuration: &Configuration,
plugins: Plugins,
) -> Result<CachingQueryPlanner<T>, BoxError> {
Expand Down Expand Up @@ -382,9 +380,7 @@ impl CachingQueryPlanner<BridgeQueryPlannerPool> {
self.delegate.js_planners()
}

pub(crate) fn subgraph_schemas(
&self,
) -> Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>> {
pub(crate) fn subgraph_schemas(&self) -> Arc<SubgraphSchemas> {
self.delegate.subgraph_schemas()
}

Expand Down
6 changes: 3 additions & 3 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use apollo_compiler::validation::Valid;
use futures::future::join_all;
use futures::prelude::*;
use tokio::sync::broadcast;
Expand All @@ -23,6 +22,7 @@ use crate::json_ext::Path;
use crate::json_ext::Value;
use crate::json_ext::ValueExt;
use crate::plugins::subscription::SubscriptionConfig;
use crate::query_planner::fetch::SubgraphSchemas;
use crate::query_planner::FlattenNode;
use crate::query_planner::Primary;
use crate::query_planner::CONDITION_ELSE_SPAN_NAME;
Expand Down Expand Up @@ -50,7 +50,7 @@ impl QueryPlan {
service_factory: &'a Arc<SubgraphServiceFactory>,
supergraph_request: &'a Arc<http::Request<Request>>,
schema: &'a Arc<Schema>,
subgraph_schemas: &'a Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
subgraph_schemas: &'a Arc<SubgraphSchemas>,
sender: mpsc::Sender<Response>,
subscription_handle: Option<SubscriptionHandle>,
subscription_config: &'a Option<SubscriptionConfig>,
Expand Down Expand Up @@ -106,7 +106,7 @@ pub(crate) struct ExecutionParameters<'a> {
pub(crate) context: &'a Context,
pub(crate) service_factory: &'a Arc<SubgraphServiceFactory>,
pub(crate) schema: &'a Arc<Schema>,
pub(crate) subgraph_schemas: &'a Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
pub(crate) subgraph_schemas: &'a Arc<SubgraphSchemas>,
pub(crate) supergraph_request: &'a Arc<http::Request<Request>>,
pub(crate) deferred_fetches: &'a HashMap<String, broadcast::Sender<(Value, Vec<Error>)>>,
pub(crate) query: &'a Arc<Query>,
Expand Down
19 changes: 13 additions & 6 deletions apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;

use apollo_compiler::ast;
use apollo_compiler::collections::HashMap;
use apollo_compiler::validation::Valid;
use apollo_compiler::ExecutableDocument;
use apollo_compiler::Name;
use indexmap::IndexSet;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -93,7 +94,12 @@ impl From<ast::OperationType> for OperationKind {
}
}

pub(crate) type SubgraphSchemas = HashMap<String, Arc<Valid<apollo_compiler::Schema>>>;
pub(crate) type SubgraphSchemas = HashMap<String, SubgraphSchema>;

pub(crate) struct SubgraphSchema {
pub(crate) schema: Arc<Valid<apollo_compiler::Schema>>,
pub(crate) implementers_map: HashMap<Name, apollo_compiler::schema::Implementers>,
}

/// A fetch node.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
Expand Down Expand Up @@ -406,7 +412,7 @@ impl FetchNode {
if let Some(subgraph_schema) =
parameters.subgraph_schemas.get(&service_name.to_string())
{
match build_operation_with_aliasing(operation, &ctx_arg, subgraph_schema) {
match build_operation_with_aliasing(operation, &ctx_arg, &subgraph_schema.schema) {
Ok(op) => {
alias_query_string = op.serialize().no_indent().to_string();
alias_query_string.as_str()
Expand Down Expand Up @@ -680,7 +686,7 @@ impl FetchNode {
subgraph_schemas: &SubgraphSchemas,
) -> Result<(), ValidationErrors> {
let schema = &subgraph_schemas[self.service_name.as_ref()];
self.operation.init_parsed(schema)?;
self.operation.init_parsed(&schema.schema)?;
Ok(())
}

Expand All @@ -690,11 +696,12 @@ impl FetchNode {
supergraph_schema_hash: &str,
) -> Result<(), ValidationErrors> {
let schema = &subgraph_schemas[self.service_name.as_ref()];
let doc = self.operation.init_parsed(schema)?;
let doc = self.operation.init_parsed(&schema.schema)?;

if let Ok(hash) = QueryHashVisitor::hash_query(
schema,
&schema.schema,
supergraph_schema_hash,
&schema.implementers_map,
doc,
self.operation_name.as_deref(),
) {
Expand Down
Loading