Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions apollo-router/src/query_planner/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use tracing::instrument;
use tracing::Instrument;

use super::rewrites;
use super::rewrites::DataRewrite;
use super::selection::execute_selection_set;
use super::selection::Selection;
use super::subgraph_context::ContextualArguments;
Expand Down Expand Up @@ -369,12 +368,10 @@ impl Variables {
impl FetchNode {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn subgraph_fetch(
&self,
service: BoxService,
subgraph_request: SubgraphRequest,
service_name: &str,
current_dir: &Path,
requires: &[Selection],
output_rewrites: &Option<Vec<DataRewrite>>,
schema: &Schema,
paths: Vec<Vec<Path>>,
operation_str: &str,
Expand All @@ -393,13 +390,13 @@ impl FetchNode {
FetchError::SubrequestHttpError { .. } => *inner,
_ => FetchError::SubrequestHttpError {
status_code: None,
service: service_name.to_string(),
service: self.service_name.to_string(),
reason: inner.to_string(),
},
},
Err(e) => FetchError::SubrequestHttpError {
status_code: None,
service: service_name.to_string(),
service: self.service_name.to_string(),
reason: e.to_string(),
},
}) {
Expand All @@ -412,27 +409,19 @@ impl FetchNode {
Ok(res) => res.response.into_parts(),
};

super::log::trace_subfetch(service_name, operation_str, &variables, &response);
super::log::trace_subfetch(&self.service_name, operation_str, &variables, &response);

if !response.is_primary() {
return (
Value::default(),
vec![FetchError::SubrequestUnexpectedPatchResponse {
service: service_name.to_string(),
service: self.service_name.to_string(),
}
.to_graphql_error(Some(current_dir.to_owned()))],
);
}

let (value, errors) = Self::response_at_path(
schema,
current_dir,
paths,
response,
requires,
output_rewrites,
service_name,
);
let (value, errors) = self.response_at_path(schema, current_dir, paths, response);
(value, errors)
}

Expand All @@ -455,15 +444,13 @@ impl FetchNode {

#[instrument(skip_all, level = "debug", name = "response_insert")]
pub(crate) fn response_at_path<'a>(
&'a self,
schema: &Schema,
current_dir: &'a Path,
inverted_paths: Vec<Vec<Path>>,
response: graphql::Response,
requires: &[Selection],
output_rewrites: &Option<Vec<DataRewrite>>,
service_name: &str,
) -> (Value, Vec<Error>) {
if !requires.is_empty() {
if !self.requires.is_empty() {
let entities_path = Path(vec![json_ext::PathElement::Key(
"_entities".to_string(),
None,
Expand Down Expand Up @@ -521,7 +508,7 @@ impl FetchNode {
let mut value = Value::default();

for (index, mut entity) in array.into_iter().enumerate() {
rewrites::apply_rewrites(schema, &mut entity, output_rewrites);
rewrites::apply_rewrites(schema, &mut entity, &self.output_rewrites);

if let Some(paths) = inverted_paths.get(index) {
if paths.len() > 1 {
Expand All @@ -547,7 +534,7 @@ impl FetchNode {
if errors.is_empty() {
tracing::warn!(
"Subgraph response from '{}' was missing key `_entities` and had no errors. This is likely a bug in the subgraph.",
service_name
self.service_name
);
}

Expand Down Expand Up @@ -577,7 +564,7 @@ impl FetchNode {
})
.collect();
let mut data = response.data.unwrap_or_default();
rewrites::apply_rewrites(schema, &mut data, output_rewrites);
rewrites::apply_rewrites(schema, &mut data, &self.output_rewrites);
(Value::from_path(current_dir, data), errors)
}
}
Expand Down
64 changes: 22 additions & 42 deletions apollo-router/src/services/fetch_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,15 @@ impl FetchService {
..
} = request;

let FetchNode {
operation,
service_name,
requires,
output_rewrites,
..
} = fetch_node;

let paths = variables.inverted_paths.clone();
let operation = operation.as_parsed().cloned();
let operation = fetch_node.operation.as_parsed().cloned();

Box::pin(async move {
let (_parts, response) = connector_service_factory
.create()
.oneshot(
ConnectRequest::builder()
.service_name(service_name.clone())
.service_name(fetch_node.service_name.clone())
.context(context)
.operation(operation?.clone())
.supergraph_request(supergraph_request)
Expand All @@ -131,15 +123,8 @@ impl FetchService {
.response
.into_parts();

let (value, errors) = FetchNode::response_at_path(
&schema,
&current_dir,
paths,
response,
&requires,
&output_rewrites,
&service_name,
);
let (value, errors) =
fetch_node.response_at_path(&schema, &current_dir, paths, response);
Ok((value, errors))
})
}
Expand All @@ -159,12 +144,10 @@ impl FetchService {
} = request;

let FetchNode {
service_name,
operation,
operation_kind,
operation_name,
requires,
output_rewrites,
ref service_name,
ref operation,
ref operation_kind,
ref operation_name,
..
} = fetch_node;

Expand All @@ -178,7 +161,7 @@ impl FetchService {
let alias_query_string; // this exists outside the if block to allow the as_str() to be longer lived
let aliased_operation = if let Some(ctx_arg) = &variables.contextual_arguments {
if let Some(subgraph_schema) = subgraph_schemas.get(&service_name.to_string()) {
match build_operation_with_aliasing(&operation, ctx_arg, subgraph_schema) {
match build_operation_with_aliasing(operation, ctx_arg, subgraph_schema) {
Ok(op) => {
alias_query_string = op.serialize().no_indent().to_string();
alias_query_string.as_str()
Expand All @@ -203,10 +186,9 @@ impl FetchService {
};

let aqs = aliased_operation.to_string(); // TODO
let sns = service_name.clone();
let current_dir = current_dir.clone();
let service = subgraph_service_factory
.create(&sns)
.create(&service_name.clone())
.expect("we already checked that the service exists during planning; qed");

let mut subgraph_request = SubgraphRequest::builder()
Expand All @@ -226,25 +208,23 @@ impl FetchService {
.expect("it won't fail because the url is correct and already checked; qed"),
)
.subgraph_name(service_name.to_string())
.operation_kind(operation_kind)
.operation_kind(*operation_kind)
.context(context.clone())
.build();
subgraph_request.query_hash = fetch_node.schema_aware_hash.clone();
subgraph_request.authorization = fetch_node.authorization.clone();
Box::pin(async move {
Ok(FetchNode::subgraph_fetch(
service,
subgraph_request,
&sns,
&current_dir,
&requires,
&output_rewrites,
&schema,
variables.inverted_paths,
&aqs,
variables.variables,
)
.await)
Ok(fetch_node
.subgraph_fetch(
service,
subgraph_request,
&current_dir,
&schema,
variables.inverted_paths,
&aqs,
variables.variables,
)
.await)
})
}
}
Expand Down