diff --git a/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md b/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md
new file mode 100644
index 0000000000..5640a274c1
--- /dev/null
+++ b/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md
@@ -0,0 +1,7 @@
+### Interrupted subgraph connections trigger error responses and subgraph service hook points ([PR #5859](https://github.com/apollographql/router/pull/5859))
+
+The router now returns a proper subgraph response, with an error if necessary, when a subgraph connection is closed or returns an error.
+
+Previously, this issue prevented the subgraph response service from being triggered in coprocessors or Rhai scripts.
+
+By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5859
\ No newline at end of file
diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs
index c2274f6bc8..5d8fae1ede 100644
--- a/apollo-router/src/services/subgraph_service.rs
+++ b/apollo-router/src/services/subgraph_service.rs
@@ -16,6 +16,7 @@ use http::header::{self};
use http::response::Parts;
use http::HeaderValue;
use http::Request;
+use http::StatusCode;
use hyper_rustls::ConfigBuilderExt;
use itertools::Itertools;
use mediatype::names::APPLICATION;
@@ -871,9 +872,34 @@ pub(crate) async fn process_batch(
// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
tracing::debug!("fetching from subgraph: {service}");
let (parts, content_type, body) =
- do_fetch(client, &batch_context, &service, request, display_body)
+ match do_fetch(client, &batch_context, &service, request, display_body)
.instrument(subgraph_req_span)
- .await?;
+ .await
+ {
+ Ok(res) => res,
+ Err(err) => {
+ let resp = http::Response::builder()
+ .status(StatusCode::INTERNAL_SERVER_ERROR)
+ .body(err.to_graphql_error(None))
+ .map_err(|err| FetchError::SubrequestHttpError {
+ status_code: None,
+ service: service.clone(),
+ reason: format!("cannot create the http response from error: {err:?}"),
+ })?;
+ let (parts, body) = resp.into_parts();
+ let body =
+ serde_json::to_vec(&body).map_err(|err| FetchError::SubrequestHttpError {
+ status_code: None,
+ service: service.clone(),
+ reason: format!("cannot serialize the error: {err:?}"),
+ })?;
+ (
+ parts,
+ Ok(ContentType::ApplicationJson),
+ Some(Ok(body.into())),
+ )
+ }
+ };
let subgraph_response_event = batch_context
.extensions()
@@ -1283,9 +1309,21 @@ pub(crate) async fn call_single_http(
// Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it.
let (parts, content_type, body) =
- do_fetch(client, &context, service_name, request, display_body)
+ match do_fetch(client, &context, service_name, request, display_body)
.instrument(subgraph_req_span)
- .await?;
+ .await
+ {
+ Ok(resp) => resp,
+ Err(err) => {
+ return Ok(SubgraphResponse::builder()
+ .subgraph_name(service_name.to_string())
+ .error(err.to_graphql_error(None))
+ .status_code(StatusCode::INTERNAL_SERVER_ERROR)
+ .context(context)
+ .extensions(Object::default())
+ .build());
+ }
+ };
let subgraph_response_event = context
.extensions()
@@ -1705,6 +1743,17 @@ mod tests {
server.await.unwrap();
}
+ // starts a local server emulating a subgraph returning connection closed
+ async fn emulate_subgraph_panic(listener: TcpListener) {
+ async fn handle(_request: http::Request
) -> Result, Infallible> {
+ panic!("test")
+ }
+
+ let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
+ let server = Server::from_tcp(listener).unwrap().serve(make_svc);
+ server.await.unwrap();
+ }
+
// starts a local server emulating a subgraph returning bad response format
async fn emulate_subgraph_ok_status_invalid_response(listener: TcpListener) {
async fn handle(_request: http::Request) -> Result, Infallible> {
@@ -2421,6 +2470,44 @@ mod tests {
assert!(response.response.body().errors.is_empty());
}
+ #[tokio::test(flavor = "multi_thread")]
+ async fn test_subgraph_service_panic() {
+ let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
+ let socket_addr = listener.local_addr().unwrap();
+ tokio::task::spawn(emulate_subgraph_panic(listener));
+ let subgraph_service = SubgraphService::new(
+ "test",
+ true,
+ None,
+ Notify::default(),
+ HttpClientServiceFactory::from_config(
+ "test",
+ &Configuration::default(),
+ Http2Config::Enable,
+ ),
+ )
+ .expect("can create a SubgraphService");
+
+ let url = Uri::from_str(&format!("http://{socket_addr}")).unwrap();
+ let response = subgraph_service
+ .oneshot(
+ SubgraphRequest::builder()
+ .supergraph_request(supergraph_request("query"))
+ .subgraph_request(subgraph_http_request(url, "query"))
+ .operation_kind(OperationKind::Query)
+ .subgraph_name(String::from("test"))
+ .context(Context::new())
+ .build(),
+ )
+ .await
+ .unwrap();
+ assert!(!response.response.body().errors.is_empty());
+ assert_eq!(
+ response.response.body().errors[0].message,
+ "HTTP fetch failed from 'test': HTTP fetch failed from 'test': connection closed before message completed"
+ );
+ }
+
#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_service_invalid_response() {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
diff --git a/apollo-router/tests/integration/file_upload.rs b/apollo-router/tests/integration/file_upload.rs
index add9e81a90..af37ac3bbb 100644
--- a/apollo-router/tests/integration/file_upload.rs
+++ b/apollo-router/tests/integration/file_upload.rs
@@ -570,7 +570,6 @@ async fn it_fails_upload_without_file() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads': HTTP fetch failed from 'uploads': error from user's HttpBody stream: error reading a body from connection: Missing files in the request: '0'.",
- "path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads",
@@ -648,7 +647,6 @@ async fn it_fails_with_file_size_limit() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads': HTTP fetch failed from 'uploads': error from user's HttpBody stream: error reading a body from connection: Exceeded the limit of 512.0 KB on 'fat.payload.bin' file.",
- "path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads",
@@ -769,7 +767,6 @@ async fn it_fails_invalid_file_order() -> Result<(), BoxError> {
"errors": [
{
"message": "HTTP fetch failed from 'uploads_clone': HTTP fetch failed from 'uploads_clone': error from user's HttpBody stream: error reading a body from connection: Missing files in the request: '1'.",
- "path": [],
"extensions": {
"code": "SUBREQUEST_HTTP_ERROR",
"service": "uploads_clone",
diff --git a/apollo-router/tests/integration/telemetry/datadog.rs b/apollo-router/tests/integration/telemetry/datadog.rs
index 613242ca39..686465849d 100644
--- a/apollo-router/tests/integration/telemetry/datadog.rs
+++ b/apollo-router/tests/integration/telemetry/datadog.rs
@@ -359,6 +359,7 @@ async fn test_span_metrics() -> Result<(), BoxError> {
.get("apollo-custom-trace-id")
.unwrap()
.is_empty());
+ router.graceful_shutdown().await;
TraceSpec::builder()
.operation_name("ExampleQuery")
.services(["client", "router", "subgraph"].into())
@@ -381,7 +382,6 @@ async fn test_span_metrics() -> Result<(), BoxError> {
.build()
.validate_trace(id)
.await?;
- router.graceful_shutdown().await;
Ok(())
}