Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### fix(subgraph_service): when the subgraph connection is closed or in error, return a proper subgraph response ([PR #5859](https://github.com/apollographql/router/pull/5859))

When the subgraph connection is closed or in error, return a proper subgraph response containing an error. This was preventing subgraph response service to be triggered in coprocessor and rhai.

By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5859
95 changes: 91 additions & 4 deletions apollo-router/src/services/subgraph_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use http::header::{self};
use http::response::Parts;
use http::HeaderValue;
use http::Request;
use http::StatusCode;
use hyper_rustls::ConfigBuilderExt;
use itertools::Itertools;
use mediatype::names::APPLICATION;
Expand Down Expand Up @@ -871,9 +872,34 @@ pub(crate) async fn process_batch(
// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
tracing::debug!("fetching from subgraph: {service}");
let (parts, content_type, body) =
do_fetch(client, &batch_context, &service, request, display_body)
match do_fetch(client, &batch_context, &service, request, display_body)
.instrument(subgraph_req_span)
.await?;
.await
{
Ok(res) => res,
Err(err) => {
let resp = http::Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(err.to_graphql_error(None))
.map_err(|err| FetchError::SubrequestHttpError {
status_code: None,
service: service.clone(),
reason: format!("cannot create the http response from error: {err:?}"),
})?;
let (parts, body) = resp.into_parts();
let body =
serde_json::to_vec(&body).map_err(|err| FetchError::SubrequestHttpError {
status_code: None,
service: service.clone(),
reason: format!("cannot serialize the error: {err:?}"),
})?;
(
parts,
Ok(ContentType::ApplicationJson),
Some(Ok(body.into())),
)
}
};

let subgraph_response_event = batch_context
.extensions()
Expand Down Expand Up @@ -1283,9 +1309,21 @@ pub(crate) async fn call_single_http(

// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
let (parts, content_type, body) =
do_fetch(client, &context, service_name, request, display_body)
match do_fetch(client, &context, service_name, request, display_body)
.instrument(subgraph_req_span)
.await?;
.await
{
Ok(resp) => resp,
Err(err) => {
return Ok(SubgraphResponse::builder()
.subgraph_name(service_name.to_string())
.error(err.to_graphql_error(None))
.status_code(StatusCode::INTERNAL_SERVER_ERROR)
.context(context)
.extensions(Object::default())
.build());
}
};

let subgraph_response_event = context
.extensions()
Expand Down Expand Up @@ -1705,6 +1743,17 @@ mod tests {
server.await.unwrap();
}

// starts a local server emulating a subgraph returning connection closed
async fn emulate_subgraph_panic(listener: TcpListener) {
async fn handle(_request: http::Request<Body>) -> Result<http::Response<Body>, Infallible> {
panic!("test")
}

let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::from_tcp(listener).unwrap().serve(make_svc);
server.await.unwrap();
}

// starts a local server emulating a subgraph returning bad response format
async fn emulate_subgraph_ok_status_invalid_response(listener: TcpListener) {
async fn handle(_request: http::Request<Body>) -> Result<http::Response<Body>, Infallible> {
Expand Down Expand Up @@ -2421,6 +2470,44 @@ mod tests {
assert!(response.response.body().errors.is_empty());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_service_panic() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let socket_addr = listener.local_addr().unwrap();
tokio::task::spawn(emulate_subgraph_panic(listener));
let subgraph_service = SubgraphService::new(
"test",
true,
None,
Notify::default(),
HttpClientServiceFactory::from_config(
"test",
&Configuration::default(),
Http2Config::Enable,
),
)
.expect("can create a SubgraphService");

let url = Uri::from_str(&format!("http://{socket_addr}")).unwrap();
let response = subgraph_service
.oneshot(
SubgraphRequest::builder()
.supergraph_request(supergraph_request("query"))
.subgraph_request(subgraph_http_request(url, "query"))
.operation_kind(OperationKind::Query)
.subgraph_name(String::from("test"))
.context(Context::new())
.build(),
)
.await
.unwrap();
assert!(!response.response.body().errors.is_empty());
assert_eq!(
response.response.body().errors[0].message,
"HTTP fetch failed from 'test': HTTP fetch failed from 'test': connection closed before message completed"
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_service_invalid_response() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
Expand Down