diff --git a/.changesets/fix_fix_coprocessor_data_null.md b/.changesets/fix_fix_coprocessor_data_null.md new file mode 100644 index 0000000000..973c0f5826 --- /dev/null +++ b/.changesets/fix_fix_coprocessor_data_null.md @@ -0,0 +1,27 @@ +### Fix Parsing of Coprocessor GraphQL Responses ([PR #7141](https://github.com/apollographql/router/pull/7141)) + +Previously Router ignored `data: null` property inside GraphQL response returned by coprocessor. +According to [GraphQL Spectification](https://spec.graphql.org/draft/#sel-FAPHLJCAACEBxlY): + +> If an error was raised during the execution that prevented a valid response, the "data" entry in the response should be null. + +That means if coprocessor returned valid execution error, for example: + +```json +{ + "data": null, + "errors": [{ "message": "Some execution error" }] +} +``` + +Router violated above restriction from GraphQL Specification by returning following response to client: + +```json +{ + "errors": [{ "message": "Some execution error" }] +} +``` + +This fix ensures full compliance with the GraphQL specification by preserving the complete structure of error responses from coprocessors. + +Contributed by [@IvanGoncharov](https://github.com/IvanGoncharov) in [#7141](https://github.com/apollographql/router/pull/7141) diff --git a/apollo-router/src/graphql/mod.rs b/apollo-router/src/graphql/mod.rs index 939a3eb338..7782829b95 100644 --- a/apollo-router/src/graphql/mod.rs +++ b/apollo-router/src/graphql/mod.rs @@ -13,6 +13,7 @@ use futures::Stream; use heck::ToShoutySnakeCase; pub use request::Request; pub use response::IncrementalResponse; +use response::MalformedResponseError; pub use response::Response; use serde::Deserialize; use serde::Serialize; @@ -21,7 +22,6 @@ use serde_json_bytes::Map as JsonMap; use serde_json_bytes::Value; pub(crate) use visitor::ResponseVisitor; -use crate::error::FetchError; use crate::json_ext::Object; use crate::json_ext::Path; pub use crate::json_ext::Path as JsonPath; @@ -124,41 +124,39 @@ impl Error { } } - pub(crate) fn from_value(service_name: &str, value: Value) -> Result { - let mut object = - ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: format!("invalid error within `errors`: {}", error), - })?; + pub(crate) fn from_value(value: Value) -> Result { + let mut object = ensure_object!(value).map_err(|error| MalformedResponseError { + reason: format!("invalid error within `errors`: {}", error), + })?; let extensions = extract_key_value_from_object!(object, "extensions", Value::Object(o) => o) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `extensions` within error: {}", err), })? .unwrap_or_default(); - let message = extract_key_value_from_object!(object, "message", Value::String(s) => s) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + let message = match extract_key_value_from_object!(object, "message", Value::String(s) => s) + { + Ok(Some(s)) => Ok(s.as_str().to_string()), + Ok(None) => Err(MalformedResponseError { + reason: "missing required `message` property within error".to_owned(), + }), + Err(err) => Err(MalformedResponseError { reason: format!("invalid `message` within error: {}", err), - })? - .map(|s| s.as_str().to_string()) - .unwrap_or_default(); + }), + }?; let locations = extract_key_value_from_object!(object, "locations") .map(skip_invalid_locations) .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `locations` within error: {}", err), })? .unwrap_or_default(); let path = extract_key_value_from_object!(object, "path") .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `path` within error: {}", err), })?; diff --git a/apollo-router/src/graphql/request.rs b/apollo-router/src/graphql/request.rs index 3239e32b86..d18c92f642 100644 --- a/apollo-router/src/graphql/request.rs +++ b/apollo-router/src/graphql/request.rs @@ -6,10 +6,10 @@ use serde::de::DeserializeSeed; use serde::de::Error; use serde_json_bytes::ByteString; use serde_json_bytes::Map as JsonMap; -use serde_json_bytes::Value; use crate::configuration::BatchingMode; use crate::json_ext::Object; +use crate::json_ext::Value; /// A GraphQL `Request` used to represent both supergraph and subgraph requests. #[derive(Clone, Derivative, Serialize, Deserialize, Default)] @@ -169,10 +169,10 @@ impl Request { pub(crate) fn batch_from_urlencoded_query( url_encoded_query: String, ) -> Result, serde_json::Error> { - let value: serde_json::Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + let value: Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) .map_err(serde_json::Error::custom)?; - Request::process_query_values(&value) + Request::process_query_values(value) } /// Convert Bytes into a GraphQL [`Request`]. @@ -180,23 +180,22 @@ impl Request { /// An error will be produced in the event that the bytes array cannot be /// turned into a valid GraphQL `Request`. pub(crate) fn batch_from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { - let value: serde_json::Value = - serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; + let value: Value = serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; - Request::process_batch_values(&value) + Request::process_batch_values(value) } - fn allocate_result_array(value: &serde_json::Value) -> Vec { + fn allocate_result_array(value: &Value) -> Vec { match value.as_array() { Some(array) => Vec::with_capacity(array.len()), None => Vec::with_capacity(1), } } - fn process_batch_values(value: &serde_json::Value) -> Result, serde_json::Error> { - let mut result = Request::allocate_result_array(value); + fn process_batch_values(value: Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(&value); - if value.is_array() { + if let Value::Array(entries) = value { u64_histogram!( "apollo.router.operations.batching.size", "Number of queries contained within each query batch", @@ -210,24 +209,21 @@ impl Request { 1, mode = BatchingMode::BatchHttpLink.to_string() // Only supported mode right now ); - for entry in value - .as_array() - .expect("We already checked that it was an array") - { - let bytes = serde_json::to_vec(entry)?; + for entry in entries { + let bytes = serde_json::to_vec(&entry)?; result.push(Request::deserialize_from_bytes(&bytes.into())?); } } else { - let bytes = serde_json::to_vec(value)?; + let bytes = serde_json::to_vec(&value)?; result.push(Request::deserialize_from_bytes(&bytes.into())?); } Ok(result) } - fn process_query_values(value: &serde_json::Value) -> Result, serde_json::Error> { - let mut result = Request::allocate_result_array(value); + fn process_query_values(value: Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(&value); - if value.is_array() { + if let Value::Array(entries) = value { u64_histogram!( "apollo.router.operations.batching.size", "Number of queries contained within each query batch", @@ -241,46 +237,37 @@ impl Request { 1, mode = BatchingMode::BatchHttpLink.to_string() // Only supported mode right now ); - for entry in value - .as_array() - .expect("We already checked that it was an array") - { - result.push(Request::process_value(entry)?); + for entry in entries { + result.push(Request::process_value(&entry)?); } } else { - result.push(Request::process_value(value)?) + result.push(Request::process_value(&value)?) } Ok(result) } - fn process_value(value: &serde_json::Value) -> Result { - let operation_name = - if let Some(serde_json::Value::String(operation_name)) = value.get("operationName") { - Some(operation_name.clone()) - } else { - None - }; - - let query = if let Some(serde_json::Value::String(query)) = value.get("query") { - Some(query.as_str()) - } else { - None - }; - let variables: Object = get_from_urlencoded_value(value, "variables")?.unwrap_or_default(); - let extensions: Object = - get_from_urlencoded_value(value, "extensions")?.unwrap_or_default(); - - let request_builder = Self::builder() + fn process_value(value: &Value) -> Result { + let operation_name = value.get("operationName").and_then(Value::as_str); + let query = value.get("query").and_then(Value::as_str).map(String::from); + let variables: Object = value + .get("variables") + .and_then(Value::as_str) + .map(serde_json::from_str) + .transpose()? + .unwrap_or_default(); + let extensions: Object = value + .get("extensions") + .and_then(Value::as_str) + .map(serde_json::from_str) + .transpose()? + .unwrap_or_default(); + + let request = Self::builder() + .and_query(query) .variables(variables) .and_operation_name(operation_name) - .extensions(extensions); - - let request = if let Some(query_str) = query { - request_builder.query(query_str).build() - } else { - request_builder.build() - }; - + .extensions(extensions) + .build(); Ok(request) } @@ -290,25 +277,13 @@ impl Request { /// An error will be produced in the event that the query string parameters /// cannot be turned into a valid GraphQL `Request`. pub fn from_urlencoded_query(url_encoded_query: String) -> Result { - let urldecoded: serde_json::Value = - serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) - .map_err(serde_json::Error::custom)?; + let urldecoded: Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + .map_err(serde_json::Error::custom)?; Request::process_value(&urldecoded) } } -fn get_from_urlencoded_value<'a, T: Deserialize<'a>>( - object: &'a serde_json::Value, - key: &str, -) -> Result, serde_json::Error> { - if let Some(serde_json::Value::String(byte_string)) = object.get(key) { - Some(serde_json::from_str(byte_string.as_str())).transpose() - } else { - Ok(None) - } -} - struct RequestFromBytesSeed<'data>(&'data Bytes); impl<'de> DeserializeSeed<'de> for RequestFromBytesSeed<'_> { diff --git a/apollo-router/src/graphql/response.rs b/apollo-router/src/graphql/response.rs index c4344c05c9..f14c2762bf 100644 --- a/apollo-router/src/graphql/response.rs +++ b/apollo-router/src/graphql/response.rs @@ -3,18 +3,25 @@ use std::time::Instant; use apollo_compiler::response::ExecutionResponse; use bytes::Bytes; +use displaydoc::Display; use serde::Deserialize; use serde::Serialize; use serde_json_bytes::ByteString; use serde_json_bytes::Map; use crate::error::Error; -use crate::error::FetchError; use crate::graphql::IntoGraphQLErrors; use crate::json_ext::Object; use crate::json_ext::Path; use crate::json_ext::Value; +#[derive(thiserror::Error, Display, Debug, Eq, PartialEq)] +#[error("GraphQL response was malformed: {reason}")] +pub(crate) struct MalformedResponseError { + /// The reason the deserialization failed. + pub(crate) reason: String, +} + /// A graphql primary response. /// Used for federated and subgraph queries. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Default)] @@ -97,63 +104,50 @@ impl Response { /// Create a [`Response`] from the supplied [`Bytes`]. /// /// This will return an error (identifying the faulty service) if the input is invalid. - pub(crate) fn from_bytes(service_name: &str, b: Bytes) -> Result { - let value = - Value::from_bytes(b).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: error.to_string(), - })?; - let object = - ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: error.to_string(), - })?; - Response::from_object(service_name, object) + pub(crate) fn from_bytes(b: Bytes) -> Result { + let value = Value::from_bytes(b).map_err(|error| MalformedResponseError { + reason: error.to_string(), + })?; + Response::from_value(value) } - pub(crate) fn from_object( - service_name: &str, - mut object: Object, - ) -> Result { + pub(crate) fn from_value(value: Value) -> Result { + let mut object = ensure_object!(value).map_err(|error| MalformedResponseError { + reason: error.to_string(), + })?; let data = object.remove("data"); let errors = extract_key_value_from_object!(object, "errors", Value::Array(v) => v) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .into_iter() .flatten() - .map(|v| Error::from_value(service_name, v)) - .collect::, FetchError>>()?; + .map(Error::from_value) + .collect::, MalformedResponseError>>()?; let extensions = extract_key_value_from_object!(object, "extensions", Value::Object(o) => o) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .unwrap_or_default(); let label = extract_key_value_from_object!(object, "label", Value::String(s) => s) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .map(|s| s.as_str().to_string()); let path = extract_key_value_from_object!(object, "path") .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?; let has_next = extract_key_value_from_object!(object, "hasNext", Value::Bool(b) => b) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?; let incremental = extract_key_value_from_object!(object, "incremental", Value::Array(a) => a).map_err( - |err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + |err| MalformedResponseError { reason: err.to_string(), }, )?; @@ -162,8 +156,7 @@ impl Response { .into_iter() .map(serde_json_bytes::from_value) .collect::, _>>() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?, None => vec![], @@ -172,8 +165,7 @@ impl Response { // If the data entry in the response is not present, the errors entry in the response must not be empty. // It must contain at least one error. The errors it contains should indicate why no data was able to be returned. if data.is_none() && errors.is_empty() { - return Err(FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + return Err(MalformedResponseError { reason: "graphql response without data must contain at least one error".to_string(), }); } @@ -481,13 +473,21 @@ mod tests { #[test] fn test_no_data_and_no_errors() { - let response = Response::from_bytes("test", "{\"errors\":null}".into()); + let response = Response::from_bytes("{\"errors\":null}".into()); assert_eq!( response.expect_err("no data and no errors"), - FetchError::SubrequestMalformedResponse { - service: "test".to_string(), + MalformedResponseError { reason: "graphql response without data must contain at least one error".to_string(), } ); } + + #[test] + fn test_data_null() { + let response = Response::from_bytes("{\"data\":null}".into()).unwrap(); + assert_eq!( + response, + Response::builder().data(Some(Value::Null)).build(), + ); + } } diff --git a/apollo-router/src/graphql/visitor.rs b/apollo-router/src/graphql/visitor.rs index e400d71979..0f25af1030 100644 --- a/apollo-router/src/graphql/visitor.rs +++ b/apollo-router/src/graphql/visitor.rs @@ -135,7 +135,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -150,7 +150,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -165,7 +165,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -180,7 +180,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 77426ec5de..aee947a788 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -12,6 +12,7 @@ use tower_service::Service; use super::*; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; @@ -209,7 +210,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); @@ -252,10 +253,10 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + let graphql_response = + graphql::Response::from_value(co_processor_output.body.unwrap_or(Value::Null)) .unwrap_or_else(|error| { - crate::graphql::Response::builder() + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -295,9 +296,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -358,7 +358,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.then(|| response.context.clone()); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); @@ -392,7 +392,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: graphql::Response = handle_graphql_response(first, co_processor_output.body)?; + let new_body = handle_graphql_response(first, co_processor_output.body)?; if let Some(control) = co_processor_output.control { parts.status = control.get_http_status()? @@ -425,7 +425,8 @@ where async move { let body_to_send = response_config.body.then(|| { - serde_json::to_value(&deferred_response).expect("serialization will not fail") + serde_json_bytes::to_value(&deferred_response) + .expect("serialization will not fail") }); let context_to_send = response_config .context @@ -459,7 +460,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_deferred_response: graphql::Response = + let new_deferred_response = handle_graphql_response(deferred_response, co_processor_output.body)?; if let Some(context) = co_processor_output.context { @@ -504,12 +505,13 @@ mod tests { use futures::future::BoxFuture; use http::StatusCode; - use serde_json::json; + use serde_json_bytes::json; use tower::BoxError; use tower::ServiceExt; use super::super::*; use super::*; + use crate::json_ext::Object; use crate::plugin::test::MockExecutionService; use crate::plugin::test::MockInternalHttpClientService; use crate::services::execution; @@ -610,7 +612,7 @@ mod tests { Ok(execution::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -680,7 +682,7 @@ mod tests { let request = execution::Request::fake_builder().build(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -788,7 +790,7 @@ mod tests { Ok(execution::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -797,7 +799,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&get_body_bytes(res.into_body()).await.unwrap()) .unwrap(); @@ -894,7 +896,7 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), ); } @@ -944,7 +946,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = + let mut deserialized_response: Externalizable = serde_json::from_slice(&get_body_bytes(res.into_body()).await.unwrap()) .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); @@ -966,9 +968,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -994,17 +994,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3, "has_next": false }, "hasNext": false }), ); } diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index be693be843..1e095390ea 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -32,6 +32,7 @@ use tower::util::MapFutureLayer; use crate::configuration::shared::Client; use crate::error::Error; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::plugin::Plugin; @@ -703,11 +704,11 @@ where .body .as_ref() .and_then(|b| serde_json::from_str(b).ok()) - .unwrap_or(serde_json::Value::Null); + .unwrap_or(Value::Null); // Now we have some JSON, let's see if it's the right "shape" to create a graphql_response. // If it isn't, we create a graphql error response - let graphql_response: crate::graphql::Response = match body_as_value { - serde_json::Value::Null => crate::graphql::Response::builder() + let graphql_response = match body_as_value { + Value::Null => graphql::Response::builder() .errors(vec![ Error::builder() .message(co_processor_output.body.take().unwrap_or_default()) @@ -715,8 +716,8 @@ where .build(), ]) .build(), - _ => serde_json::from_value(body_as_value).unwrap_or_else(|error| { - crate::graphql::Response::builder() + _ => graphql::Response::from_value(body_as_value).unwrap_or_else(|error| { + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -1014,7 +1015,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let uri = request_config.uri.then(|| parts.uri.to_string()); @@ -1059,29 +1060,28 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - match co_processor_output.body.unwrap_or(serde_json::Value::Null) { - serde_json::Value::String(s) => crate::graphql::Response::builder() + let graphql_response = match co_processor_output.body.unwrap_or(Value::Null) { + Value::String(s) => graphql::Response::builder() + .errors(vec![ + Error::builder() + .message(s.as_str().to_owned()) + .extension_code(COPROCESSOR_ERROR_EXTENSION) + .build(), + ]) + .build(), + value => graphql::Response::from_value(value).unwrap_or_else(|error| { + graphql::Response::builder() .errors(vec![ Error::builder() - .message(s) - .extension_code(COPROCESSOR_ERROR_EXTENSION) + .message(format!( + "couldn't deserialize coprocessor output body: {error}" + )) + .extension_code(COPROCESSOR_DESERIALIZATION_ERROR_EXTENSION) .build(), ]) - .build(), - value => serde_json::from_value(value).unwrap_or_else(|error| { - crate::graphql::Response::builder() - .errors(vec![ - Error::builder() - .message(format!( - "couldn't deserialize coprocessor output body: {error}" - )) - .extension_code(COPROCESSOR_DESERIALIZATION_ERROR_EXTENSION) - .build(), - ]) - .build() - }), - }; + .build() + }), + }; let mut http_response = http::Response::builder() .status(code) @@ -1113,9 +1113,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -1178,7 +1177,7 @@ where let body_to_send = response_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = response_config.context.then(|| response.context.clone()); let service_name = response_config.service_name.then_some(service_name); @@ -1215,8 +1214,7 @@ where // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: crate::graphql::Response = - handle_graphql_response(body, co_processor_output.body)?; + let new_body = handle_graphql_response(body, co_processor_output.body)?; response.response = http::Response::from_parts(parts, new_body); @@ -1287,27 +1285,25 @@ pub(super) fn internalize_header_map( pub(super) fn handle_graphql_response( original_response_body: graphql::Response, - copro_response_body: Option, + copro_response_body: Option, ) -> Result { - let new_body: graphql::Response = match copro_response_body { + Ok(match copro_response_body { Some(value) => { - let mut new_body: graphql::Response = serde_json::from_value(value)?; + let mut new_body = graphql::Response::from_value(value)?; // Needs to take back these 2 fields because it's skipped by serde new_body.subscribed = original_response_body.subscribed; new_body.created_at = original_response_body.created_at; // Required because for subscription if data is Some(Null) it won't cut the subscription // And in some languages they don't have any differences between Some(Null) and Null - if original_response_body.data == Some(serde_json_bytes::Value::Null) + if original_response_body.data == Some(Value::Null) && new_body.data.is_none() && new_body.subscribed == Some(true) { - new_body.data = Some(serde_json_bytes::Value::Null); + new_body.data = Some(Value::Null); } new_body } None => original_response_body, - }; - - Ok(new_body) + }) } diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index e4123ac519..eb78c5d98a 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -12,6 +12,7 @@ use tower_service::Service; use super::*; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::OneShotAsyncCheckpointLayer; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; @@ -221,7 +222,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.then(|| request.context.clone()); let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); @@ -260,10 +261,10 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + let graphql_response = + graphql::Response::from_value(co_processor_output.body.unwrap_or(Value::Null)) .unwrap_or_else(|error| { - crate::graphql::Response::builder() + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -303,9 +304,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -374,7 +374,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.then(|| response.context.clone()); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); @@ -408,7 +408,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: graphql::Response = handle_graphql_response(first, co_processor_output.body)?; + let new_body = handle_graphql_response(first, co_processor_output.body)?; if let Some(control) = co_processor_output.control { parts.status = control.get_http_status()? @@ -449,7 +449,8 @@ where return Ok(deferred_response); } let body_to_send = response_config.body.then(|| { - serde_json::to_value(&deferred_response).expect("serialization will not fail") + serde_json_bytes::to_value(&deferred_response) + .expect("serialization will not fail") }); let context_to_send = response_config .context @@ -486,7 +487,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_deferred_response: graphql::Response = + let new_deferred_response = handle_graphql_response(deferred_response, co_processor_output.body)?; if let Some(context) = co_processor_output.context { @@ -531,12 +532,13 @@ mod tests { use futures::future::BoxFuture; use http::StatusCode; - use serde_json::json; + use serde_json_bytes::json; use tower::BoxError; use tower::ServiceExt; use super::super::*; use super::*; + use crate::json_ext::Object; use crate::plugin::test::MockInternalHttpClientService; use crate::plugin::test::MockSupergraphService; use crate::plugins::telemetry::config_new::conditions::SelectorOrValue; @@ -637,7 +639,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -707,7 +709,7 @@ mod tests { let request = supergraph::Request::fake_builder().build().unwrap(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -814,7 +816,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -887,7 +889,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -896,7 +898,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |mut res: http::Request| { Box::pin(async move { - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&get_body_bytes(&mut res).await.unwrap()).unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); @@ -992,7 +994,7 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), ); } @@ -1043,7 +1045,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = + let mut deserialized_response: Externalizable = serde_json::from_slice(&get_body_bytes(res.into_body()).await.unwrap()) .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); @@ -1065,9 +1067,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -1094,17 +1094,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3, "has_next": false }, "hasNext": false }), ); } @@ -1161,7 +1161,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = + let mut deserialized_response: Externalizable = serde_json::from_slice(&get_body_bytes(res.into_body()).await.unwrap()) .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); @@ -1183,9 +1183,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -1212,17 +1210,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2 }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3 }, "hasNext": false }), ); } diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index 8a88494382..c80167e12c 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -13,13 +13,14 @@ mod tests { use mime::APPLICATION_JSON; use mime::TEXT_HTML; use router::body::RouterBody; - use serde_json::json; - use serde_json_bytes::Value; + use serde_json_bytes::json; use services::subgraph::SubgraphRequestId; use tower::BoxError; use tower::ServiceExt; use super::super::*; + use crate::json_ext::Object; + use crate::json_ext::Value; use crate::plugin::test::MockInternalHttpClientService; use crate::plugin::test::MockRouterService; use crate::plugin::test::MockSubgraphService; @@ -36,7 +37,7 @@ mod tests { #[tokio::test] async fn load_plugin() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081" } @@ -54,7 +55,7 @@ mod tests { #[tokio::test] async fn unknown_fields_are_denied() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081", "thisFieldDoesntExist": true @@ -75,7 +76,7 @@ mod tests { #[tokio::test] async fn external_plugin_with_stages_wont_load_without_graph_ref() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081", "stages": { @@ -325,7 +326,7 @@ mod tests { let request = subgraph::Request::fake_builder().build(); assert_eq!( - "couldn't deserialize coprocessor output body: missing field `message`", + "couldn't deserialize coprocessor output body: GraphQL response was malformed: missing required `message` property within error", service .oneshot(request) .await @@ -388,7 +389,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .build()) @@ -396,7 +397,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) .unwrap(); assert_eq!( @@ -469,7 +470,7 @@ mod tests { assert_eq!("5678", &*response.id); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), response.response.into_body().data.unwrap() ); } @@ -504,7 +505,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build()) }); @@ -540,7 +541,7 @@ mod tests { let request = subgraph::Request::fake_builder().build(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -666,7 +667,7 @@ mod tests { assert_eq!( actual_response, - serde_json::from_value(json!({ + serde_json_bytes::from_value(json!({ "errors": [{ "message": "my error message", "extensions": { @@ -700,7 +701,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .build()) @@ -793,7 +794,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -827,7 +828,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build()) }); @@ -911,7 +912,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -1024,7 +1025,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) .unwrap(); @@ -1137,7 +1138,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) .unwrap(); @@ -1261,7 +1262,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) .unwrap(); @@ -1358,7 +1359,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&get_body_bytes(req.into_body()).await.unwrap()) .unwrap(); @@ -1413,7 +1414,7 @@ mod tests { assert_eq!("a value", value); - let actual_response = serde_json::from_slice::( + let actual_response = serde_json::from_slice::( &hyper::body::to_bytes(response.into_body()).await.unwrap(), ) .unwrap(); @@ -1447,7 +1448,7 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = + let deserialized_request: Externalizable = serde_json::from_slice(&hyper::body::to_bytes(req.into_body()).await.unwrap()) .unwrap(); @@ -1490,7 +1491,7 @@ mod tests { .response; assert_eq!(response.status(), http::StatusCode::UNAUTHORIZED); - let actual_response = serde_json::from_slice::( + let actual_response = serde_json::from_slice::( &hyper::body::to_bytes(response.into_body()).await.unwrap(), ) .unwrap(); @@ -1534,11 +1535,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let deserialized_response: Externalizable = - serde_json::from_slice( - &hyper::body::to_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_response: Externalizable = serde_json::from_slice( + &hyper::body::to_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( @@ -1633,7 +1633,7 @@ mod tests { // the body should have changed: assert_eq!( json!({ "data": { "test": 42_u32 } }), - serde_json::from_slice::( + serde_json::from_slice::( &get_body_bytes(res.response.into_body()).await.unwrap() ) .unwrap() diff --git a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs index 59f8b2df87..e275f12a65 100644 --- a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs +++ b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs @@ -819,7 +819,7 @@ mod tests { .as_object() .cloned() .unwrap_or_default(); - let response = Response::from_bytes("test", Bytes::from(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from(response_bytes)).unwrap(); let schema = DemandControlledSchema::new(Arc::new(schema.supergraph_schema().clone())).unwrap(); StaticCostCalculator::new(Arc::new(schema), Default::default(), 100) @@ -847,7 +847,7 @@ mod tests { .as_object() .cloned() .unwrap_or_default(); - let response = Response::from_bytes("test", Bytes::from(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from(response_bytes)).unwrap(); let schema = DemandControlledSchema::new(Arc::new(schema)).unwrap(); StaticCostCalculator::new(Arc::new(schema), Default::default(), 100) diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index 9974cf19d5..8b4d8bdced 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -786,7 +786,11 @@ fn http_response_to_graphql_response( // Application json expects valid graphql response if 2xx tracing::debug_span!("parse_subgraph_response").in_scope(|| { // Application graphql json expects valid graphql response - graphql::Response::from_bytes(service_name, body).unwrap_or_else(|error| { + graphql::Response::from_bytes(body).unwrap_or_else(|error| { + let error = FetchError::SubrequestMalformedResponse { + service: service_name.to_owned(), + reason: error.reason, + }; graphql::Response::builder() .error(error.to_graphql_error(None)) .build() @@ -802,7 +806,7 @@ fn http_response_to_graphql_response( if original_response.is_empty() { original_response = "".into() } - graphql::Response::from_bytes(service_name, body).unwrap_or_else(|_error| { + graphql::Response::from_bytes(body).unwrap_or_else(|_error| { graphql::Response::builder() .error( FetchError::SubrequestMalformedResponse { diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 85e34db10a..e318890892 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -103,11 +103,11 @@ async fn test_coprocessor_response_handling() -> Result<(), BoxError> { test_full_pipeline(500, "RouterRequest", empty_body_object).await; test_full_pipeline(500, "RouterResponse", empty_body_object).await; test_full_pipeline(200, "SupergraphRequest", empty_body_object).await; - test_full_pipeline(200, "SupergraphResponse", empty_body_object).await; + test_full_pipeline(500, "SupergraphResponse", empty_body_object).await; test_full_pipeline(200, "SubgraphRequest", empty_body_object).await; test_full_pipeline(200, "SubgraphResponse", empty_body_object).await; test_full_pipeline(200, "ExecutionRequest", empty_body_object).await; - test_full_pipeline(200, "ExecutionResponse", empty_body_object).await; + test_full_pipeline(500, "ExecutionResponse", empty_body_object).await; test_full_pipeline(200, "RouterRequest", remove_body).await; test_full_pipeline(200, "RouterResponse", remove_body).await; @@ -130,20 +130,12 @@ async fn test_coprocessor_response_handling() -> Result<(), BoxError> { } fn empty_body_object(mut body: serde_json::Value) -> serde_json::Value { - *body - .as_object_mut() - .expect("body") - .get_mut("body") - .expect("body") = serde_json::Value::Object(serde_json::Map::new()); + *body.pointer_mut("/body").expect("body") = json!({}); body } fn empty_body_string(mut body: serde_json::Value) -> serde_json::Value { - *body - .as_object_mut() - .expect("body") - .get_mut("body") - .expect("body") = serde_json::Value::String("".to_string()); + *body.pointer_mut("/body").expect("body") = json!(""); body } @@ -153,7 +145,7 @@ fn remove_body(mut body: serde_json::Value) -> serde_json::Value { } fn null_out_response(_body: serde_json::Value) -> serde_json::Value { - serde_json::Value::String("".to_string()) + json!("") } async fn test_full_pipeline( @@ -274,3 +266,51 @@ async fn test_coprocessor_demand_control_access() -> Result<(), BoxError> { Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_proxying_error_response() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_coprocessor = wiremock::MockServer::start().await; + let coprocessor_address = mock_coprocessor.uri(); + + Mock::given(method("POST")) + .and(path("/")) + .respond_with(|req: &wiremock::Request| { + let body = req.body_json::().expect("body"); + ResponseTemplate::new(200).set_body_json(body) + }) + .mount(&mock_coprocessor) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor.router.yaml") + .replace("", &coprocessor_address), + ) + .responder(ResponseTemplate::new(200).set_body_json(json!({ + "errors": [{ "message": "subgraph error", "path": [] }], + "data": null + }))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + assert_eq!( + response.json::().await?, + json!({ + "errors": [{ "message": "Subgraph errors redacted", "path": [] }], + "data": null + }) + ); + + router.graceful_shutdown().await; + + Ok(()) +}