diff --git a/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md b/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md new file mode 100644 index 0000000000..9e09db2866 --- /dev/null +++ b/.changesets/fix_bnjjj_fix_fatal_error_subgraph_response.md @@ -0,0 +1,5 @@ +### fix(subgraph_service): when the subgraph connection is closed or in error, return a proper subgraph response ([PR #5859](https://github.com/apollographql/router/pull/5859)) + +When the subgraph connection is closed or in error, return a proper subgraph response containing an error. This was preventing subgraph response service to be triggered in coprocessor and rhai. + +By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router/pull/5859 \ No newline at end of file diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index c2274f6bc8..5d8fae1ede 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -16,6 +16,7 @@ use http::header::{self}; use http::response::Parts; use http::HeaderValue; use http::Request; +use http::StatusCode; use hyper_rustls::ConfigBuilderExt; use itertools::Itertools; use mediatype::names::APPLICATION; @@ -871,9 +872,34 @@ pub(crate) async fn process_batch( // Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it. tracing::debug!("fetching from subgraph: {service}"); let (parts, content_type, body) = - do_fetch(client, &batch_context, &service, request, display_body) + match do_fetch(client, &batch_context, &service, request, display_body) .instrument(subgraph_req_span) - .await?; + .await + { + Ok(res) => res, + Err(err) => { + let resp = http::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(err.to_graphql_error(None)) + .map_err(|err| FetchError::SubrequestHttpError { + status_code: None, + service: service.clone(), + reason: format!("cannot create the http response from error: {err:?}"), + })?; + let (parts, body) = resp.into_parts(); + let body = + serde_json::to_vec(&body).map_err(|err| FetchError::SubrequestHttpError { + status_code: None, + service: service.clone(), + reason: format!("cannot serialize the error: {err:?}"), + })?; + ( + parts, + Ok(ContentType::ApplicationJson), + Some(Ok(body.into())), + ) + } + }; let subgraph_response_event = batch_context .extensions() @@ -1283,9 +1309,21 @@ pub(crate) async fn call_single_http( // Perform the actual fetch. If this fails then we didn't manage to make the call at all, so we can't do anything with it. let (parts, content_type, body) = - do_fetch(client, &context, service_name, request, display_body) + match do_fetch(client, &context, service_name, request, display_body) .instrument(subgraph_req_span) - .await?; + .await + { + Ok(resp) => resp, + Err(err) => { + return Ok(SubgraphResponse::builder() + .subgraph_name(service_name.to_string()) + .error(err.to_graphql_error(None)) + .status_code(StatusCode::INTERNAL_SERVER_ERROR) + .context(context) + .extensions(Object::default()) + .build()); + } + }; let subgraph_response_event = context .extensions() @@ -1705,6 +1743,17 @@ mod tests { server.await.unwrap(); } + // starts a local server emulating a subgraph returning connection closed + async fn emulate_subgraph_panic(listener: TcpListener) { + async fn handle(_request: http::Request) -> Result, Infallible> { + panic!("test") + } + + let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); + let server = Server::from_tcp(listener).unwrap().serve(make_svc); + server.await.unwrap(); + } + // starts a local server emulating a subgraph returning bad response format async fn emulate_subgraph_ok_status_invalid_response(listener: TcpListener) { async fn handle(_request: http::Request) -> Result, Infallible> { @@ -2421,6 +2470,44 @@ mod tests { assert!(response.response.body().errors.is_empty()); } + #[tokio::test(flavor = "multi_thread")] + async fn test_subgraph_service_panic() { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let socket_addr = listener.local_addr().unwrap(); + tokio::task::spawn(emulate_subgraph_panic(listener)); + let subgraph_service = SubgraphService::new( + "test", + true, + None, + Notify::default(), + HttpClientServiceFactory::from_config( + "test", + &Configuration::default(), + Http2Config::Enable, + ), + ) + .expect("can create a SubgraphService"); + + let url = Uri::from_str(&format!("http://{socket_addr}")).unwrap(); + let response = subgraph_service + .oneshot( + SubgraphRequest::builder() + .supergraph_request(supergraph_request("query")) + .subgraph_request(subgraph_http_request(url, "query")) + .operation_kind(OperationKind::Query) + .subgraph_name(String::from("test")) + .context(Context::new()) + .build(), + ) + .await + .unwrap(); + assert!(!response.response.body().errors.is_empty()); + assert_eq!( + response.response.body().errors[0].message, + "HTTP fetch failed from 'test': HTTP fetch failed from 'test': connection closed before message completed" + ); + } + #[tokio::test(flavor = "multi_thread")] async fn test_subgraph_service_invalid_response() { let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();