Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Interrupted subgraph connections trigger error responses and subgraph service hook points ([PR #5859](https://github.com/apollographql/router/pull/5859))

The router now returns a proper subgraph response, with an error if necessary, when a subgraph connection is closed or returns an error.

Previously, this issue prevented the subgraph response service from being triggered in coprocessors or Rhai scripts.

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5859
95 changes: 91 additions & 4 deletions apollo-router/src/services/subgraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use http::header::{self};
use http::response::Parts;
use http::HeaderValue;
use http::Request;
use http::StatusCode;
use hyper_rustls::ConfigBuilderExt;
use itertools::Itertools;
use mediatype::names::APPLICATION;
Expand Down Expand Up @@ -871,9 +872,34 @@ pub(crate) async fn process_batch(
// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
tracing::debug!("fetching from subgraph: {service}");
let (parts, content_type, body) =
do_fetch(client, &batch_context, &service, request, display_body)
match do_fetch(client, &batch_context, &service, request, display_body)
.instrument(subgraph_req_span)
.await?;
.await
{
Ok(res) => res,
Err(err) => {
let resp = http::Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(err.to_graphql_error(None))
.map_err(|err| FetchError::SubrequestHttpError {
status_code: None,
service: service.clone(),
reason: format!("cannot create the http response from error: {err:?}"),
})?;
let (parts, body) = resp.into_parts();
let body =
serde_json::to_vec(&body).map_err(|err| FetchError::SubrequestHttpError {
status_code: None,
service: service.clone(),
reason: format!("cannot serialize the error: {err:?}"),
})?;
(
parts,
Ok(ContentType::ApplicationJson),
Some(Ok(body.into())),
)
}
};

let subgraph_response_event = batch_context
.extensions()
Expand Down Expand Up @@ -1283,9 +1309,21 @@ pub(crate) async fn call_single_http(

// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
let (parts, content_type, body) =
do_fetch(client, &context, service_name, request, display_body)
match do_fetch(client, &context, service_name, request, display_body)
.instrument(subgraph_req_span)
.await?;
.await
{
Ok(resp) => resp,
Err(err) => {
return Ok(SubgraphResponse::builder()
.subgraph_name(service_name.to_string())
.error(err.to_graphql_error(None))
.status_code(StatusCode::INTERNAL_SERVER_ERROR)
.context(context)
.extensions(Object::default())
.build());
}
};

let subgraph_response_event = context
.extensions()
Expand Down Expand Up @@ -1705,6 +1743,17 @@ mod tests {
server.await.unwrap();
}

// starts a local server emulating a subgraph returning connection closed
async fn emulate_subgraph_panic(listener: TcpListener) {
async fn handle(_request: http::Request<Body>) -> Result<http::Response<Body>, Infallible> {
panic!("test")
}

let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::from_tcp(listener).unwrap().serve(make_svc);
server.await.unwrap();
}

// starts a local server emulating a subgraph returning bad response format
async fn emulate_subgraph_ok_status_invalid_response(listener: TcpListener) {
async fn handle(_request: http::Request<Body>) -> Result<http::Response<Body>, Infallible> {
Expand Down Expand Up @@ -2421,6 +2470,44 @@ mod tests {
assert!(response.response.body().errors.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_service_panic() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let socket_addr = listener.local_addr().unwrap();
tokio::task::spawn(emulate_subgraph_panic(listener));
let subgraph_service = SubgraphService::new(
"test",
true,
None,
Notify::default(),
HttpClientServiceFactory::from_config(
"test",
&Configuration::default(),
Http2Config::Enable,
),
)
.expect("can create a SubgraphService");

let url = Uri::from_str(&format!("http://{socket_addr}")).unwrap();
let response = subgraph_service
.oneshot(
SubgraphRequest::builder()
.supergraph_request(supergraph_request("query"))
.subgraph_request(subgraph_http_request(url, "query"))
.operation_kind(OperationKind::Query)
.subgraph_name(String::from("test"))
.context(Context::new())
.build(),
)
.await
.unwrap();
assert!(!response.response.body().errors.is_empty());
assert_eq!(
response.response.body().errors[0].message,
"HTTP fetch failed from 'test': HTTP fetch failed from 'test': connection closed before message completed"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_service_invalid_response() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
Expand Down
3 changes: 0 additions & 3 deletions apollo-router/tests/integration/file_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ async fn it_fails_upload_without_file() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads': HTTP fetch failed from 'uploads': error from user's HttpBody stream: error reading a body from connection: Missing files in the request: '0'.",
"path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads",
Expand Down Expand Up @@ -648,7 +647,6 @@ async fn it_fails_with_file_size_limit() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads': HTTP fetch failed from 'uploads': error from user's HttpBody stream: error reading a body from connection: Exceeded the limit of 512.0 KB on 'fat.payload.bin' file.",
"path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads",
Expand Down Expand Up @@ -769,7 +767,6 @@ async fn it_fails_invalid_file_order() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads_clone': HTTP fetch failed from 'uploads_clone': error from user's HttpBody stream: error reading a body from connection: Missing files in the request: '1'.",
"path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads_clone",
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/tests/integration/telemetry/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ async fn test_span_metrics() -> Result<(), BoxError> {
.get("apollo-custom-trace-id")
.unwrap()
.is_empty());
router.graceful_shutdown().await;
TraceSpec::builder()
.operation_name("ExampleQuery")
.services(["client", "router", "subgraph"].into())
Expand All @@ -381,7 +382,6 @@ async fn test_span_metrics() -> Result<(), BoxError> {
.build()
.validate_trace(id)
.await?;
router.graceful_shutdown().await;
Ok(())
}

Expand Down