Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 80 additions & 75 deletions apollo-federation/src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3671,113 +3671,118 @@ impl RebasedFragments {

// Collect used variables from operation types.

fn collect_variables_from_value<'selection>(
value: &'selection executable::Value,
variables: &mut HashSet<&'selection Name>,
) {
match value {
executable::Value::Variable(v) => {
variables.insert(v);
}
executable::Value::List(list) => {
for value in list {
collect_variables_from_value(value, variables);
}
pub(crate) struct VariableCollector<'s> {
variables: HashSet<&'s Name>,
}

impl<'s> VariableCollector<'s> {
pub(crate) fn new() -> Self {
Self {
variables: Default::default(),
}
executable::Value::Object(object) => {
for (_key, value) in object {
collect_variables_from_value(value, variables);
}

fn visit_value(&mut self, value: &'s executable::Value) {
match value {
executable::Value::Variable(v) => {
self.variables.insert(v);
}
executable::Value::List(list) => {
for value in list {
self.visit_value(value);
}
}
executable::Value::Object(object) => {
for (_key, value) in object {
self.visit_value(value);
}
}
_ => {}
}
_ => {}
}
}

fn collect_variables_from_directive<'selection>(
directive: &'selection executable::Directive,
variables: &mut HashSet<&'selection Name>,
) {
for arg in directive.arguments.iter() {
collect_variables_from_value(&arg.value, variables);
fn visit_directive(&mut self, directive: &'s executable::Directive) {
for arg in directive.arguments.iter() {
self.visit_value(&arg.value);
}
}
}

impl Field {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
for arg in self.arguments.iter() {
collect_variables_from_value(&arg.value, variables);
}
for dir in self.directives.iter() {
collect_variables_from_directive(dir, variables);
pub(crate) fn visit_directive_list(&mut self, directives: &'s executable::DirectiveList) {
for dir in directives.iter() {
self.visit_directive(dir);
}
}
}

impl FieldSelection {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
self.field.collect_variables(variables);
if let Some(set) = &self.selection_set {
set.collect_variables(variables);
fn visit_field(&mut self, field: &'s Field) {
for arg in field.arguments.iter() {
self.visit_value(&arg.value);
}
self.visit_directive_list(&field.directives);
}
}

impl InlineFragment {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
for dir in self.directives.iter() {
collect_variables_from_directive(dir, variables);
fn visit_field_selection(&mut self, selection: &'s FieldSelection) {
self.visit_field(&selection.field);
if let Some(set) = &selection.selection_set {
self.visit_selection_set(set);
}
}
}

impl InlineFragmentSelection {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
self.inline_fragment.collect_variables(variables);
self.selection_set.collect_variables(variables);
fn visit_inline_fragment(&mut self, fragment: &'s InlineFragment) {
self.visit_directive_list(&fragment.directives);
}

fn visit_inline_fragment_selection(&mut self, selection: &'s InlineFragmentSelection) {
self.visit_inline_fragment(&selection.inline_fragment);
self.visit_selection_set(&selection.selection_set);
}

fn visit_fragment_spread(&mut self, fragment: &'s FragmentSpread) {
self.visit_directive_list(&fragment.directives);
self.visit_directive_list(&fragment.fragment_directives);
}
}

impl FragmentSpread {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
for dir in self.directives.iter() {
collect_variables_from_directive(dir, variables);
fn visit_fragment_spread_selection(&mut self, selection: &'s FragmentSpreadSelection) {
self.visit_fragment_spread(&selection.spread);
self.visit_selection_set(&selection.selection_set);
}

fn visit_selection(&mut self, selection: &'s Selection) {
match selection {
Selection::Field(field) => self.visit_field_selection(field),
Selection::InlineFragment(frag) => self.visit_inline_fragment_selection(frag),
Selection::FragmentSpread(frag) => self.visit_fragment_spread_selection(frag),
}
for dir in self.fragment_directives.iter() {
collect_variables_from_directive(dir, variables);
}

pub(crate) fn visit_selection_set(&mut self, selection_set: &'s SelectionSet) {
for selection in selection_set.iter() {
self.visit_selection(selection);
}
}
}

impl FragmentSpreadSelection {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
self.spread.collect_variables(variables);
self.selection_set.collect_variables(variables);
/// Consume the collector and return the collected names.
pub(crate) fn into_inner(self) -> HashSet<&'s Name> {
self.variables
}
}

impl Selection {
fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
match self {
Selection::Field(field) => field.collect_variables(variables),
Selection::InlineFragment(frag) => frag.collect_variables(variables),
Selection::FragmentSpread(frag) => frag.collect_variables(variables),
}
impl Fragment {
/// Returns the variable names that are used by this fragment.
pub(crate) fn used_variables(&self) -> HashSet<&'_ Name> {
let mut collector = VariableCollector::new();
collector.visit_directive_list(&self.directives);
collector.visit_selection_set(&self.selection_set);
collector.into_inner()
}
}

impl SelectionSet {
/// Returns the variable names that are used by this selection set, including through fragment
/// spreads.
pub(crate) fn used_variables(&self) -> HashSet<&'_ Name> {
let mut variables = HashSet::new();
self.collect_variables(&mut variables);
variables
}

fn collect_variables<'selection>(&'selection self, variables: &mut HashSet<&'selection Name>) {
for selection in self.selections.values() {
selection.collect_variables(variables);
}
let mut collector = VariableCollector::new();
collector.visit_selection_set(self);
collector.into_inner()
}
}

Expand Down
2 changes: 1 addition & 1 deletion apollo-federation/src/operation/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ impl SelectionSet {
// over fragment reuse, and so we do not want to invest a lot of time into improving
// fragment reuse. We do the simple, less-than-ideal thing.
if let Some(variable_definitions) = &context.operation_variables {
let fragment_variables = candidate.selection_set.used_variables();
let fragment_variables = candidate.used_variables();
if fragment_variables
.difference(variable_definitions)
.next()
Expand Down
61 changes: 35 additions & 26 deletions apollo-federation/src/query_plan/fetch_dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use apollo_compiler::ast::Type;
use apollo_compiler::collections::IndexMap;
use apollo_compiler::collections::IndexSet;
use apollo_compiler::executable;
use apollo_compiler::executable::DirectiveList;
use apollo_compiler::executable::VariableDefinition;
use apollo_compiler::name;
use apollo_compiler::schema;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::operation::Selection;
use crate::operation::SelectionId;
use crate::operation::SelectionMap;
use crate::operation::SelectionSet;
use crate::operation::VariableCollector;
use crate::operation::TYPENAME_FIELD;
use crate::query_graph::extract_subgraphs_from_supergraph::FEDERATION_REPRESENTATIONS_ARGUMENTS_NAME;
use crate::query_graph::extract_subgraphs_from_supergraph::FEDERATION_REPRESENTATIONS_VAR_NAME;
Expand Down Expand Up @@ -2325,6 +2327,7 @@ impl FetchDependencyGraphNode {
query_graph: &QueryGraph,
handled_conditions: &Conditions,
variable_definitions: &[Node<VariableDefinition>],
operation_directives: &Arc<DirectiveList>,
fragments: Option<&mut RebasedFragments>,
operation_name: Option<Name>,
) -> Result<Option<super::PlanNode>, FederationError> {
Expand All @@ -2346,18 +2349,26 @@ impl FetchDependencyGraphNode {
.transpose()?;
let subgraph_schema = query_graph.schema_by_source(&self.subgraph_name)?;

let variable_usages = {
let set = selection.used_variables();
let mut list = set.into_iter().cloned().collect::<Vec<_>>();
list.sort();
list
// Narrow down the variable definitions to only the ones used in the subgraph operation.
let variable_definitions = {
let mut collector = VariableCollector::new();
collector.visit_directive_list(operation_directives);
collector.visit_selection_set(&selection);
let used_variables = collector.into_inner();

variable_definitions
.iter()
.filter(|variable| used_variables.contains(&variable.name))
.cloned()
.collect::<Vec<_>>()
};

let mut operation = if self.is_entity_fetch {
operation_for_entities_fetch(
subgraph_schema,
selection,
variable_definitions,
operation_directives,
&operation_name,
)?
} else {
Expand All @@ -2366,6 +2377,7 @@ impl FetchDependencyGraphNode {
self.root_kind,
selection,
variable_definitions,
operation_directives,
&operation_name,
)?
};
Expand All @@ -2374,6 +2386,17 @@ impl FetchDependencyGraphNode {
{
operation.reuse_fragments(fragments)?;
}

let variable_usages = {
let mut list = operation
.variables
.iter()
.map(|variable| variable.name.clone())
.collect::<Vec<_>>();
list.sort();
list
};

let operation_document = operation.try_into()?;

let node = super::PlanNode::Fetch(Box::new(super::FetchNode {
Expand Down Expand Up @@ -2535,19 +2558,11 @@ impl FetchDependencyGraphNode {
fn operation_for_entities_fetch(
subgraph_schema: &ValidFederationSchema,
selection_set: SelectionSet,
all_variable_definitions: &[Node<VariableDefinition>],
mut variable_definitions: Vec<Node<VariableDefinition>>,
operation_directives: &Arc<DirectiveList>,
operation_name: &Option<Name>,
) -> Result<Operation, FederationError> {
let mut variable_definitions: Vec<Node<VariableDefinition>> =
Vec::with_capacity(all_variable_definitions.len() + 1);
variable_definitions.push(representations_variable_definition(subgraph_schema)?);
let used_variables = selection_set.used_variables();
variable_definitions.extend(
all_variable_definitions
.iter()
.filter(|definition| used_variables.contains(&definition.name))
.cloned(),
);
variable_definitions.insert(0, representations_variable_definition(subgraph_schema)?);

let query_type_name = subgraph_schema.schema().root_operation(OperationType::Query).ok_or_else(||
SingleFederationError::InvalidSubgraph {
Expand Down Expand Up @@ -2611,7 +2626,7 @@ fn operation_for_entities_fetch(
root_kind: SchemaRootDefinitionKind::Query,
name: operation_name.clone(),
variables: Arc::new(variable_definitions),
directives: Default::default(),
directives: Arc::clone(operation_directives),
selection_set,
named_fragments: Default::default(),
})
Expand All @@ -2621,22 +2636,16 @@ fn operation_for_query_fetch(
subgraph_schema: &ValidFederationSchema,
root_kind: SchemaRootDefinitionKind,
selection_set: SelectionSet,
variable_definitions: &[Node<VariableDefinition>],
variable_definitions: Vec<Node<VariableDefinition>>,
operation_directives: &Arc<DirectiveList>,
operation_name: &Option<Name>,
) -> Result<Operation, FederationError> {
let used_variables = selection_set.used_variables();
let variable_definitions = variable_definitions
.iter()
.filter(|definition| used_variables.contains(&definition.name))
.cloned()
.collect();

Ok(Operation {
schema: subgraph_schema.clone(),
root_kind,
name: operation_name.clone(),
variables: Arc::new(variable_definitions),
directives: Default::default(),
directives: Arc::clone(operation_directives),
selection_set,
named_fragments: Default::default(),
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashSet;
use std::sync::Arc;

use apollo_compiler::executable::DirectiveList;
use apollo_compiler::executable::VariableDefinition;
use apollo_compiler::Name;
use apollo_compiler::Node;
Expand Down Expand Up @@ -44,7 +46,8 @@ const FETCH_COST: QueryPlanCost = 1000.0;
const PIPELINING_COST: QueryPlanCost = 100.0;

pub(crate) struct FetchDependencyGraphToQueryPlanProcessor {
variable_definitions: Vec<Node<VariableDefinition>>,
variable_definitions: Arc<Vec<Node<VariableDefinition>>>,
operation_directives: Arc<DirectiveList>,
fragments: Option<RebasedFragments>,
operation_name: Option<Name>,
assigned_defer_labels: Option<HashSet<String>>,
Expand Down Expand Up @@ -241,13 +244,15 @@ fn sequence_cost(values: impl IntoIterator<Item = QueryPlanCost>) -> QueryPlanCo

impl FetchDependencyGraphToQueryPlanProcessor {
pub(crate) fn new(
variable_definitions: Vec<Node<VariableDefinition>>,
variable_definitions: Arc<Vec<Node<VariableDefinition>>>,
operation_directives: Arc<DirectiveList>,
fragments: Option<RebasedFragments>,
operation_name: Option<Name>,
assigned_defer_labels: Option<HashSet<String>>,
) -> Self {
Self {
variable_definitions,
operation_directives,
fragments,
operation_name,
assigned_defer_labels,
Expand Down Expand Up @@ -276,6 +281,7 @@ impl FetchDependencyGraphProcessor<Option<PlanNode>, DeferredDeferBlock>
query_graph,
handled_conditions,
&self.variable_definitions,
&self.operation_directives,
self.fragments.as_mut(),
op_name,
)
Expand Down
3 changes: 2 additions & 1 deletion apollo-federation/src/query_plan/query_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ impl QueryPlanner {
None
};
let mut processor = FetchDependencyGraphToQueryPlanProcessor::new(
operation.variables.clone(),
normalized_operation.variables.clone(),
normalized_operation.directives.clone(),
rebased_fragments,
operation.name.clone(),
assigned_defer_labels,
Expand Down
Loading