Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ expression: connectors.by_service_name
},
),
config: None,
max_requests: None,
entity_resolver: None,
},
"connectors-subgraph_Query_user_0": Connector {
Expand Down Expand Up @@ -138,6 +139,7 @@ expression: connectors.by_service_name
},
),
config: None,
max_requests: None,
entity_resolver: Some(
Explicit,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ expression: connectors.by_service_name
},
),
config: None,
max_requests: None,
entity_resolver: None,
},
"connectors_Query_usersByCompany_0": Connector {
Expand Down
3 changes: 1 addition & 2 deletions apollo-federation/src/sources/connect/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ impl HeaderValue {
/// Replace variable references in the header value with the given variable definitions.
///
/// # Errors
/// Returns an error if a variable used in the header value is not defined or if a variable
/// value is not a string.
/// Returns an error if a variable used in the header value is not defined.
pub fn interpolate(&self, vars: &Map<ByteString, JSON>) -> Result<String, String> {
let mut result = String::new();
for part in &self.parts {
Expand Down
24 changes: 24 additions & 0 deletions apollo-router/src/plugins/connectors/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! Connectors error types.

use apollo_federation::sources::connect::Connector;

use crate::graphql;
use crate::graphql::ErrorExtension;
use crate::json_ext::Path;

/// Errors that apply to all connector types. These errors represent a problem invoking the
/// connector, as opposed to an error returned from the connector itself.
Expand All @@ -10,6 +14,26 @@ pub(crate) enum Error {
RequestLimitExceeded,
}

impl Error {
/// Create a GraphQL error from this error.
#[must_use]
pub(crate) fn to_graphql_error(
&self,
connector: &Connector,
path: Option<Path>,
) -> crate::error::Error {
let builder = graphql::Error::builder()
.message(self.to_string())
.extension_code(self.extension_code())
.extension("service", connector.id.label.clone());
if let Some(path) = path {
builder.path(path).build()
} else {
builder.build()
}
}
}

impl ErrorExtension for Error {
fn extension_code(&self) -> String {
match self {
Expand Down
66 changes: 35 additions & 31 deletions apollo-router/src/plugins/connectors/handle_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use parking_lot::Mutex;
use serde_json_bytes::ByteString;
use serde_json_bytes::Value;

use crate::error::FetchError;
use crate::graphql;
use crate::graphql::ErrorExtension;
use crate::plugins::connectors::http::Response as ConnectorResponse;
use crate::plugins::connectors::http::Result as ConnectorResult;
use crate::plugins::connectors::make_requests::ResponseKey;
Expand Down Expand Up @@ -46,20 +46,12 @@ pub(crate) async fn handle_responses<T: HttpBody>(
let mut data = serde_json_bytes::Map::new();
let mut errors = Vec::new();
let count = responses.len();
struct ErrorInput {
message: String,
code: String,
}

for response in responses {
let mut error = None;
let response_key = response.key;
match response.result {
ConnectorResult::Err(e) => {
error = Some(ErrorInput {
message: e.to_string(),
code: e.extension_code(),
})
error = Some(e.to_graphql_error(connector, None));
}
ConnectorResult::HttpResponse(response) => {
let (parts, body) = response.into_parts();
Expand Down Expand Up @@ -167,10 +159,18 @@ pub(crate) async fn handle_responses<T: HttpBody>(
}
}
} else {
error = Some(ErrorInput {
message: format!("http error: {}", parts.status),
code: format!("{}", parts.status.as_u16()),
});
error = Some(
FetchError::SubrequestHttpError {
status_code: Some(parts.status.as_u16()),
service: connector.id.label.clone(),
reason: format!(
"{}: {}",
parts.status.as_str(),
parts.status.canonical_reason().unwrap_or("Unknown")
),
}
.to_graphql_error(None),
);
if let Some(ref debug) = debug {
match serde_json::from_slice(body) {
Ok(json_data) => {
Expand Down Expand Up @@ -199,15 +199,7 @@ pub(crate) async fn handle_responses<T: HttpBody>(
}
_ => {}
};

errors.push(
graphql::Error::builder()
.message(error.message)
// todo path: ["_entities", i, "???"]
.extension_code(error.code)
.extension("connector", connector.id.label.clone())
.build(),
);
errors.push(error);
}
}

Expand Down Expand Up @@ -739,29 +731,41 @@ mod tests {
path: None,
errors: [
Error {
message: "http error: 404 Not Found",
message: "HTTP fetch failed from 'test label': 404: Not Found",
locations: [],
path: None,
extensions: {
"connector": String(
"code": String(
"SUBREQUEST_HTTP_ERROR",
),
"service": String(
"test label",
),
"code": String(
"404",
"reason": String(
"404: Not Found",
),
"http": Object({
"status": Number(404),
}),
},
},
Error {
message: "http error: 500 Internal Server Error",
message: "HTTP fetch failed from 'test label': 500: Internal Server Error",
locations: [],
path: None,
extensions: {
"connector": String(
"code": String(
"SUBREQUEST_HTTP_ERROR",
),
"service": String(
"test label",
),
"code": String(
"500",
"reason": String(
"500: Internal Server Error",
),
"http": Object({
"status": Number(500),
}),
},
},
],
Expand Down
14 changes: 9 additions & 5 deletions apollo-router/src/plugins/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ async fn max_requests() {
{
"message": "Request limit exceeded",
"extensions": {
"connector": "connectors.json http: GET /users/{$args.id!}",
"service": "connectors.json http: GET /users/{$args.id!}",
"code": "REQUEST_LIMIT_EXCEEDED"
}
}
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn source_max_requests() {
{
"message": "Request limit exceeded",
"extensions": {
"connector": "connectors.json http: GET /users/{$args.id!}",
"service": "connectors.json http: GET /users/{$args.id!}",
"code": "REQUEST_LIMIT_EXCEEDED"
}
}
Expand Down Expand Up @@ -538,10 +538,14 @@ async fn basic_errors() {
"data": null,
"errors": [
{
"message": "http error: 404 Not Found",
"message": "HTTP fetch failed from 'connectors.json http: GET /users': 404: Not Found",
"extensions": {
"connector": "connectors.json http: GET /users",
"code": "404"
"code": "SUBREQUEST_HTTP_ERROR",
"service": "connectors.json http: GET /users",
"reason": "404: Not Found",
"http": {
"status": 404
}
}
}
]
Expand Down
25 changes: 15 additions & 10 deletions apollo-router/src/query_planner/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::PlanNode;
use super::QueryPlan;
use crate::axum_factory::CanceledRequest;
use crate::error::Error;
use crate::error::FetchError;
use crate::graphql::Request;
use crate::graphql::Response;
use crate::json_ext::Object;
Expand Down Expand Up @@ -254,16 +255,20 @@ impl PlanNode {
.variables(variables)
.current_dir(current_dir.clone())
.build();
(value, errors) = match service.oneshot(request).await {
Ok(r) => r,
Err(e) => (
Value::Null,
vec![Error::builder()
.message(format!("{:?}", e))
.extension_code("FETCH_SERVICE")
.build()],
),
};
(value, errors) =
match service.oneshot(request).await.map_err(|e| {
FetchError::SubrequestHttpError {
status_code: None,
service: fetch_node.service_name.to_string(),
reason: e.to_string(),
}
}) {
Ok(r) => r,
Err(e) => (
Value::default(),
vec![e.to_graphql_error(Some(current_dir.to_owned()))],
),
};
FetchNode::deferred_fetches(
current_dir,
&fetch_node.id,
Expand Down
33 changes: 29 additions & 4 deletions apollo-router/src/services/fetch_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::task::Poll;

use apollo_compiler::validation::Valid;
use futures::future::BoxFuture;
use serde_json_bytes::Value;
use tower::BoxError;
use tower::ServiceExt;
use tracing::instrument::Instrumented;
Expand All @@ -16,6 +17,7 @@ use super::fetch::BoxService;
use super::new_service::ServiceFactory;
use super::ConnectRequest;
use super::SubgraphRequest;
use crate::error::FetchError;
use crate::graphql::Request as GraphQLRequest;
use crate::http_ext;
use crate::plugins::subscription::SubscriptionConfig;
Expand Down Expand Up @@ -64,6 +66,7 @@ impl tower::Service<FetchRequest> for FetchService {
Self::fetch_with_connector_service(
self.schema.clone(),
self.connector_service_factory.clone(),
connector.id.subgraph_name.clone(),
request,
)
.instrument(tracing::info_span!(
Expand Down Expand Up @@ -93,6 +96,7 @@ impl FetchService {
fn fetch_with_connector_service(
schema: Arc<Schema>,
connector_service_factory: Arc<ConnectorServiceFactory>,
subgraph_name: String,
request: FetchRequest,
) -> BoxFuture<'static, Result<FetchResponse, BoxError>> {
let FetchRequest {
Expand All @@ -108,7 +112,7 @@ impl FetchService {
let operation = fetch_node.operation.as_parsed().cloned();

Box::pin(async move {
let (_parts, response) = connector_service_factory
let (_parts, response) = match connector_service_factory
.create()
.oneshot(
ConnectRequest::builder()
Expand All @@ -119,9 +123,30 @@ impl FetchService {
.variables(variables)
.build(),
)
.await?
.response
.into_parts();
.await
.map_err(|e| match e.downcast::<FetchError>() {
Ok(inner) => match *inner {
FetchError::SubrequestHttpError { .. } => *inner,
_ => FetchError::SubrequestHttpError {
status_code: None,
service: subgraph_name,
reason: inner.to_string(),
},
},
Err(e) => FetchError::SubrequestHttpError {
status_code: None,
service: subgraph_name,
reason: e.to_string(),
},
}) {
Err(e) => {
return Ok((
Value::default(),
vec![e.to_graphql_error(Some(current_dir.to_owned()))],
));
}
Ok(res) => res.response.into_parts(),
};

let (value, errors) =
fetch_node.response_at_path(&schema, &current_dir, paths, response);
Expand Down