diff --git a/.changesets/fix_fix_coprocessor_data_null.md b/.changesets/fix_fix_coprocessor_data_null.md new file mode 100644 index 0000000000..973c0f5826 --- /dev/null +++ b/.changesets/fix_fix_coprocessor_data_null.md @@ -0,0 +1,27 @@ +### Fix Parsing of Coprocessor GraphQL Responses ([PR #7141](https://github.com/apollographql/router/pull/7141)) + +Previously Router ignored `data: null` property inside GraphQL response returned by coprocessor. +According to [GraphQL Spectification](https://spec.graphql.org/draft/#sel-FAPHLJCAACEBxlY): + +> If an error was raised during the execution that prevented a valid response, the "data" entry in the response should be null. + +That means if coprocessor returned valid execution error, for example: + +```json +{ + "data": null, + "errors": [{ "message": "Some execution error" }] +} +``` + +Router violated above restriction from GraphQL Specification by returning following response to client: + +```json +{ + "errors": [{ "message": "Some execution error" }] +} +``` + +This fix ensures full compliance with the GraphQL specification by preserving the complete structure of error responses from coprocessors. + +Contributed by [@IvanGoncharov](https://github.com/IvanGoncharov) in [#7141](https://github.com/apollographql/router/pull/7141) diff --git a/apollo-router/src/graphql/mod.rs b/apollo-router/src/graphql/mod.rs index 05cb3ddb12..28ff608d92 100644 --- a/apollo-router/src/graphql/mod.rs +++ b/apollo-router/src/graphql/mod.rs @@ -13,6 +13,7 @@ use futures::Stream; use heck::ToShoutySnakeCase; pub use request::Request; pub use response::IncrementalResponse; +use response::MalformedResponseError; pub use response::Response; use serde::Deserialize; use serde::Serialize; @@ -21,7 +22,6 @@ use serde_json_bytes::Map as JsonMap; use serde_json_bytes::Value; pub(crate) use visitor::ResponseVisitor; -use crate::error::FetchError; use crate::json_ext::Object; use crate::json_ext::Path; pub use crate::json_ext::Path as JsonPath; @@ -125,41 +125,39 @@ impl Error { } } - pub(crate) fn from_value(service_name: &str, value: Value) -> Result { - let mut object = - ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: format!("invalid error within `errors`: {}", error), - })?; + pub(crate) fn from_value(value: Value) -> Result { + let mut object = ensure_object!(value).map_err(|error| MalformedResponseError { + reason: format!("invalid error within `errors`: {}", error), + })?; let extensions = extract_key_value_from_object!(object, "extensions", Value::Object(o) => o) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `extensions` within error: {}", err), })? .unwrap_or_default(); - let message = extract_key_value_from_object!(object, "message", Value::String(s) => s) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + let message = match extract_key_value_from_object!(object, "message", Value::String(s) => s) + { + Ok(Some(s)) => Ok(s.as_str().to_string()), + Ok(None) => Err(MalformedResponseError { + reason: "missing required `message` property within error".to_owned(), + }), + Err(err) => Err(MalformedResponseError { reason: format!("invalid `message` within error: {}", err), - })? - .map(|s| s.as_str().to_string()) - .unwrap_or_default(); + }), + }?; let locations = extract_key_value_from_object!(object, "locations") .map(skip_invalid_locations) .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `locations` within error: {}", err), })? .unwrap_or_default(); let path = extract_key_value_from_object!(object, "path") .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: format!("invalid `path` within error: {}", err), })?; diff --git a/apollo-router/src/graphql/request.rs b/apollo-router/src/graphql/request.rs index 4057c95e9a..627a102757 100644 --- a/apollo-router/src/graphql/request.rs +++ b/apollo-router/src/graphql/request.rs @@ -6,10 +6,10 @@ use serde::de::DeserializeSeed; use serde::de::Error; use serde_json_bytes::ByteString; use serde_json_bytes::Map as JsonMap; -use serde_json_bytes::Value; use crate::configuration::BatchingMode; use crate::json_ext::Object; +use crate::json_ext::Value; /// A GraphQL `Request` used to represent both supergraph and subgraph requests. #[derive(Clone, Derivative, Serialize, Deserialize, Default)] @@ -169,10 +169,10 @@ impl Request { pub(crate) fn batch_from_urlencoded_query( url_encoded_query: String, ) -> Result, serde_json::Error> { - let value: serde_json::Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + let value: Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) .map_err(serde_json::Error::custom)?; - Request::process_query_values(&value) + Request::process_query_values(value) } /// Convert Bytes into a GraphQL [`Request`]. @@ -180,23 +180,22 @@ impl Request { /// An error will be produced in the event that the bytes array cannot be /// turned into a valid GraphQL `Request`. pub(crate) fn batch_from_bytes(bytes: &[u8]) -> Result, serde_json::Error> { - let value: serde_json::Value = - serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; + let value: Value = serde_json::from_slice(bytes).map_err(serde_json::Error::custom)?; - Request::process_batch_values(&value) + Request::process_batch_values(value) } - fn allocate_result_array(value: &serde_json::Value) -> Vec { + fn allocate_result_array(value: &Value) -> Vec { match value.as_array() { Some(array) => Vec::with_capacity(array.len()), None => Vec::with_capacity(1), } } - fn process_batch_values(value: &serde_json::Value) -> Result, serde_json::Error> { - let mut result = Request::allocate_result_array(value); + fn process_batch_values(value: Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(&value); - if let serde_json::Value::Array(entries) = value { + if let Value::Array(entries) = value { u64_histogram!( "apollo.router.operations.batching.size", "Number of queries contained within each query batch", @@ -211,20 +210,20 @@ impl Request { mode = BatchingMode::BatchHttpLink.to_string() // Only supported mode right now ); for entry in entries { - let bytes = serde_json::to_vec(entry)?; + let bytes = serde_json::to_vec(&entry)?; result.push(Request::deserialize_from_bytes(&bytes.into())?); } } else { - let bytes = serde_json::to_vec(value)?; + let bytes = serde_json::to_vec(&value)?; result.push(Request::deserialize_from_bytes(&bytes.into())?); } Ok(result) } - fn process_query_values(value: &serde_json::Value) -> Result, serde_json::Error> { - let mut result = Request::allocate_result_array(value); + fn process_query_values(value: Value) -> Result, serde_json::Error> { + let mut result = Request::allocate_result_array(&value); - if let serde_json::Value::Array(entries) = value { + if let Value::Array(entries) = value { u64_histogram!( "apollo.router.operations.batching.size", "Number of queries contained within each query batch", @@ -239,42 +238,36 @@ impl Request { mode = BatchingMode::BatchHttpLink.to_string() // Only supported mode right now ); for entry in entries { - result.push(Request::process_value(entry)?); + result.push(Request::process_value(&entry)?); } } else { - result.push(Request::process_value(value)?) + result.push(Request::process_value(&value)?) } Ok(result) } - fn process_value(value: &serde_json::Value) -> Result { - let operation_name = - if let Some(serde_json::Value::String(operation_name)) = value.get("operationName") { - Some(operation_name.clone()) - } else { - None - }; - - let query = if let Some(serde_json::Value::String(query)) = value.get("query") { - Some(query.as_str()) - } else { - None - }; - let variables: Object = get_from_urlencoded_value(value, "variables")?.unwrap_or_default(); - let extensions: Object = - get_from_urlencoded_value(value, "extensions")?.unwrap_or_default(); - - let request_builder = Self::builder() + fn process_value(value: &Value) -> Result { + let operation_name = value.get("operationName").and_then(Value::as_str); + let query = value.get("query").and_then(Value::as_str).map(String::from); + let variables: Object = value + .get("variables") + .and_then(Value::as_str) + .map(serde_json::from_str) + .transpose()? + .unwrap_or_default(); + let extensions: Object = value + .get("extensions") + .and_then(Value::as_str) + .map(serde_json::from_str) + .transpose()? + .unwrap_or_default(); + + let request = Self::builder() + .and_query(query) .variables(variables) .and_operation_name(operation_name) - .extensions(extensions); - - let request = if let Some(query_str) = query { - request_builder.query(query_str).build() - } else { - request_builder.build() - }; - + .extensions(extensions) + .build(); Ok(request) } @@ -284,25 +277,13 @@ impl Request { /// An error will be produced in the event that the query string parameters /// cannot be turned into a valid GraphQL `Request`. pub fn from_urlencoded_query(url_encoded_query: String) -> Result { - let urldecoded: serde_json::Value = - serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) - .map_err(serde_json::Error::custom)?; + let urldecoded: Value = serde_urlencoded::from_bytes(url_encoded_query.as_bytes()) + .map_err(serde_json::Error::custom)?; Request::process_value(&urldecoded) } } -fn get_from_urlencoded_value<'a, T: Deserialize<'a>>( - object: &'a serde_json::Value, - key: &str, -) -> Result, serde_json::Error> { - if let Some(serde_json::Value::String(byte_string)) = object.get(key) { - Some(serde_json::from_str(byte_string.as_str())).transpose() - } else { - Ok(None) - } -} - struct RequestFromBytesSeed<'data>(&'data Bytes); impl<'de> DeserializeSeed<'de> for RequestFromBytesSeed<'_> { diff --git a/apollo-router/src/graphql/response.rs b/apollo-router/src/graphql/response.rs index c4344c05c9..f14c2762bf 100644 --- a/apollo-router/src/graphql/response.rs +++ b/apollo-router/src/graphql/response.rs @@ -3,18 +3,25 @@ use std::time::Instant; use apollo_compiler::response::ExecutionResponse; use bytes::Bytes; +use displaydoc::Display; use serde::Deserialize; use serde::Serialize; use serde_json_bytes::ByteString; use serde_json_bytes::Map; use crate::error::Error; -use crate::error::FetchError; use crate::graphql::IntoGraphQLErrors; use crate::json_ext::Object; use crate::json_ext::Path; use crate::json_ext::Value; +#[derive(thiserror::Error, Display, Debug, Eq, PartialEq)] +#[error("GraphQL response was malformed: {reason}")] +pub(crate) struct MalformedResponseError { + /// The reason the deserialization failed. + pub(crate) reason: String, +} + /// A graphql primary response. /// Used for federated and subgraph queries. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Default)] @@ -97,63 +104,50 @@ impl Response { /// Create a [`Response`] from the supplied [`Bytes`]. /// /// This will return an error (identifying the faulty service) if the input is invalid. - pub(crate) fn from_bytes(service_name: &str, b: Bytes) -> Result { - let value = - Value::from_bytes(b).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: error.to_string(), - })?; - let object = - ensure_object!(value).map_err(|error| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), - reason: error.to_string(), - })?; - Response::from_object(service_name, object) + pub(crate) fn from_bytes(b: Bytes) -> Result { + let value = Value::from_bytes(b).map_err(|error| MalformedResponseError { + reason: error.to_string(), + })?; + Response::from_value(value) } - pub(crate) fn from_object( - service_name: &str, - mut object: Object, - ) -> Result { + pub(crate) fn from_value(value: Value) -> Result { + let mut object = ensure_object!(value).map_err(|error| MalformedResponseError { + reason: error.to_string(), + })?; let data = object.remove("data"); let errors = extract_key_value_from_object!(object, "errors", Value::Array(v) => v) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .into_iter() .flatten() - .map(|v| Error::from_value(service_name, v)) - .collect::, FetchError>>()?; + .map(Error::from_value) + .collect::, MalformedResponseError>>()?; let extensions = extract_key_value_from_object!(object, "extensions", Value::Object(o) => o) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .unwrap_or_default(); let label = extract_key_value_from_object!(object, "label", Value::String(s) => s) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })? .map(|s| s.as_str().to_string()); let path = extract_key_value_from_object!(object, "path") .map(serde_json_bytes::from_value) .transpose() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?; let has_next = extract_key_value_from_object!(object, "hasNext", Value::Bool(b) => b) - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?; let incremental = extract_key_value_from_object!(object, "incremental", Value::Array(a) => a).map_err( - |err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + |err| MalformedResponseError { reason: err.to_string(), }, )?; @@ -162,8 +156,7 @@ impl Response { .into_iter() .map(serde_json_bytes::from_value) .collect::, _>>() - .map_err(|err| FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + .map_err(|err| MalformedResponseError { reason: err.to_string(), })?, None => vec![], @@ -172,8 +165,7 @@ impl Response { // If the data entry in the response is not present, the errors entry in the response must not be empty. // It must contain at least one error. The errors it contains should indicate why no data was able to be returned. if data.is_none() && errors.is_empty() { - return Err(FetchError::SubrequestMalformedResponse { - service: service_name.to_string(), + return Err(MalformedResponseError { reason: "graphql response without data must contain at least one error".to_string(), }); } @@ -481,13 +473,21 @@ mod tests { #[test] fn test_no_data_and_no_errors() { - let response = Response::from_bytes("test", "{\"errors\":null}".into()); + let response = Response::from_bytes("{\"errors\":null}".into()); assert_eq!( response.expect_err("no data and no errors"), - FetchError::SubrequestMalformedResponse { - service: "test".to_string(), + MalformedResponseError { reason: "graphql response without data must contain at least one error".to_string(), } ); } + + #[test] + fn test_data_null() { + let response = Response::from_bytes("{\"data\":null}".into()).unwrap(); + assert_eq!( + response, + Response::builder().data(Some(Value::Null)).build(), + ); + } } diff --git a/apollo-router/src/graphql/visitor.rs b/apollo-router/src/graphql/visitor.rs index e400d71979..0f25af1030 100644 --- a/apollo-router/src/graphql/visitor.rs +++ b/apollo-router/src/graphql/visitor.rs @@ -135,7 +135,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -150,7 +150,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -165,7 +165,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); @@ -180,7 +180,7 @@ mod tests { let schema = Schema::parse_and_validate(schema_str, "").unwrap(); let request = ExecutableDocument::parse(&schema, query_str, "").unwrap(); - let response = Response::from_bytes("test", Bytes::from_static(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from_static(response_bytes)).unwrap(); let mut visitor = FieldCounter::new(); visitor.visit(&request, &response, &Default::default()); diff --git a/apollo-router/src/metrics/mod.rs b/apollo-router/src/metrics/mod.rs index 86014b638a..b84549ee89 100644 --- a/apollo-router/src/metrics/mod.rs +++ b/apollo-router/src/metrics/mod.rs @@ -2110,19 +2110,16 @@ mod test { ]; let errors = codes.map(|code| { - graphql::Error::from_value( - "mySubgraph", - json!( - { - "message": "error occurred", - "extensions": { - "code": code, - "service": "mySubgraph" - }, - "path": ["obj", "field"] - } - ), - ) + graphql::Error::from_value(json!( + { + "message": "error occurred", + "extensions": { + "code": code, + "service": "mySubgraph" + }, + "path": ["obj", "field"] + } + )) .unwrap() }); @@ -2221,19 +2218,16 @@ mod test { ]; let errors = codes.map(|code| { - graphql::Error::from_value( - "mySubgraph", - json!( - { - "message": "error occurred", - "extensions": { - "code": code, - "service": "mySubgraph" - }, - "path": ["obj", "field"] - } - ), - ) + graphql::Error::from_value(json!( + { + "message": "error occurred", + "extensions": { + "code": code, + "service": "mySubgraph" + }, + "path": ["obj", "field"] + } + )) .unwrap() }); diff --git a/apollo-router/src/plugins/coprocessor/execution.rs b/apollo-router/src/plugins/coprocessor/execution.rs index 4e7b1c1580..96ba713f9d 100644 --- a/apollo-router/src/plugins/coprocessor/execution.rs +++ b/apollo-router/src/plugins/coprocessor/execution.rs @@ -12,6 +12,7 @@ use tower_service::Service; use super::*; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::AsyncCheckpointLayer; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; @@ -206,7 +207,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.get_context(&request.context); let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); @@ -247,10 +248,10 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + let graphql_response = + graphql::Response::from_value(co_processor_output.body.unwrap_or(Value::Null)) .unwrap_or_else(|error| { - crate::graphql::Response::builder() + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -295,9 +296,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -362,7 +362,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); @@ -394,7 +394,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: graphql::Response = handle_graphql_response(first, co_processor_output.body)?; + let new_body = handle_graphql_response(first, co_processor_output.body)?; if let Some(control) = co_processor_output.control { parts.status = control.get_http_status()? @@ -433,7 +433,8 @@ where async move { let body_to_send = response_config.body.then(|| { - serde_json::to_value(&deferred_response).expect("serialization will not fail") + serde_json_bytes::to_value(&deferred_response) + .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); @@ -463,7 +464,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_deferred_response: graphql::Response = + let new_deferred_response = handle_graphql_response(deferred_response, co_processor_output.body)?; if let Some(context) = co_processor_output.context { @@ -513,12 +514,13 @@ mod tests { use futures::future::BoxFuture; use http::StatusCode; - use serde_json::json; + use serde_json_bytes::json; use tower::BoxError; use tower::ServiceExt; use super::super::*; use super::*; + use crate::json_ext::Object; use crate::plugin::test::MockExecutionService; use crate::plugin::test::MockInternalHttpClientService; use crate::services::execution; @@ -619,7 +621,7 @@ mod tests { Ok(execution::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -689,7 +691,7 @@ mod tests { let request = execution::Request::fake_builder().build(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -797,7 +799,7 @@ mod tests { Ok(execution::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -806,11 +808,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let deserialized_response: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_response: Externalizable = serde_json::from_slice( + &router::body::into_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( @@ -907,7 +908,7 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), ); } @@ -957,11 +958,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let mut deserialized_response: Externalizable = serde_json::from_slice( + &router::body::into_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( PipelineStep::ExecutionResponse.to_string(), @@ -981,9 +981,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -1009,17 +1007,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3, "has_next": false }, "hasNext": false }), ); } diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index d5b6e7c9be..00bf6f3674 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -38,6 +38,7 @@ use crate::context::context_key_from_deprecated; use crate::context::context_key_to_deprecated; use crate::error::Error; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::AsyncCheckpointLayer; use crate::plugin::Plugin; @@ -769,11 +770,11 @@ where .body .as_ref() .and_then(|b| serde_json::from_str(b).ok()) - .unwrap_or(serde_json::Value::Null); + .unwrap_or(Value::Null); // Now we have some JSON, let's see if it's the right "shape" to create a graphql_response. // If it isn't, we create a graphql error response - let graphql_response: crate::graphql::Response = match body_as_value { - serde_json::Value::Null => crate::graphql::Response::builder() + let graphql_response = match body_as_value { + Value::Null => graphql::Response::builder() .errors(vec![ Error::builder() .message(co_processor_output.body.take().unwrap_or_default()) @@ -781,8 +782,8 @@ where .build(), ]) .build(), - _ => serde_json::from_value(body_as_value).unwrap_or_else(|error| { - crate::graphql::Response::builder() + _ => graphql::Response::from_value(body_as_value).unwrap_or_else(|error| { + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -1094,7 +1095,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = request_config.context.get_context(&request.context); let uri = request_config.uri.then(|| parts.uri.to_string()); @@ -1137,29 +1138,28 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - match co_processor_output.body.unwrap_or(serde_json::Value::Null) { - serde_json::Value::String(s) => crate::graphql::Response::builder() + let graphql_response = match co_processor_output.body.unwrap_or(Value::Null) { + Value::String(s) => graphql::Response::builder() + .errors(vec![ + Error::builder() + .message(s.as_str().to_owned()) + .extension_code(COPROCESSOR_ERROR_EXTENSION) + .build(), + ]) + .build(), + value => graphql::Response::from_value(value).unwrap_or_else(|error| { + graphql::Response::builder() .errors(vec![ Error::builder() - .message(s) - .extension_code(COPROCESSOR_ERROR_EXTENSION) + .message(format!( + "couldn't deserialize coprocessor output body: {error}" + )) + .extension_code(COPROCESSOR_DESERIALIZATION_ERROR_EXTENSION) .build(), ]) - .build(), - value => serde_json::from_value(value).unwrap_or_else(|error| { - crate::graphql::Response::builder() - .errors(vec![ - Error::builder() - .message(format!( - "couldn't deserialize coprocessor output body: {error}" - )) - .extension_code(COPROCESSOR_DESERIALIZATION_ERROR_EXTENSION) - .build(), - ]) - .build() - }), - }; + .build() + }), + }; let mut http_response = http::Response::builder() .status(code) @@ -1196,9 +1196,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -1265,7 +1264,7 @@ where let body_to_send = response_config .body - .then(|| serde_json::to_value(&body)) + .then(|| serde_json_bytes::to_value(&body)) .transpose()?; let context_to_send = response_config.context.get_context(&response.context); let service_name = response_config.service_name.then_some(service_name); @@ -1300,8 +1299,7 @@ where // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: crate::graphql::Response = - handle_graphql_response(body, co_processor_output.body)?; + let new_body = handle_graphql_response(body, co_processor_output.body)?; response.response = http::Response::from_parts(parts, new_body); @@ -1377,27 +1375,25 @@ pub(super) fn internalize_header_map( pub(super) fn handle_graphql_response( original_response_body: graphql::Response, - copro_response_body: Option, + copro_response_body: Option, ) -> Result { - let new_body: graphql::Response = match copro_response_body { + Ok(match copro_response_body { Some(value) => { - let mut new_body: graphql::Response = serde_json::from_value(value)?; + let mut new_body = graphql::Response::from_value(value)?; // Needs to take back these 2 fields because it's skipped by serde new_body.subscribed = original_response_body.subscribed; new_body.created_at = original_response_body.created_at; // Required because for subscription if data is Some(Null) it won't cut the subscription // And in some languages they don't have any differences between Some(Null) and Null - if original_response_body.data == Some(serde_json_bytes::Value::Null) + if original_response_body.data == Some(Value::Null) && new_body.data.is_none() && new_body.subscribed == Some(true) { - new_body.data = Some(serde_json_bytes::Value::Null); + new_body.data = Some(Value::Null); } new_body } None => original_response_body, - }; - - Ok(new_body) + }) } diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 77ebb60a1f..166da22c56 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -12,6 +12,7 @@ use tower_service::Service; use super::*; use crate::graphql; +use crate::json_ext::Value; use crate::layers::ServiceBuilderExt; use crate::layers::async_checkpoint::AsyncCheckpointLayer; use crate::plugins::coprocessor::EXTERNAL_SPAN_NAME; @@ -218,7 +219,7 @@ where let body_to_send = request_config .body - .then(|| serde_json::from_slice::(&bytes)) + .then(|| serde_json::from_slice::(&bytes)) .transpose()?; let context_to_send = request_config.context.get_context(&request.context); let sdl_to_send = request_config.sdl.then(|| sdl.clone().to_string()); @@ -255,10 +256,10 @@ where let code = control.get_http_status()?; let res = { - let graphql_response: crate::graphql::Response = - serde_json::from_value(co_processor_output.body.unwrap_or(serde_json::Value::Null)) + let graphql_response = + graphql::Response::from_value(co_processor_output.body.unwrap_or(Value::Null)) .unwrap_or_else(|error| { - crate::graphql::Response::builder() + graphql::Response::builder() .errors(vec![ Error::builder() .message(format!( @@ -303,9 +304,8 @@ where // Finally, process our reply and act on the contents. Our processing logic is // that we replace "bits" of our incoming request with the updated bits if they // are present in our co_processor_output. - - let new_body: crate::graphql::Request = match co_processor_output.body { - Some(value) => serde_json::from_value(value)?, + let new_body: graphql::Request = match co_processor_output.body { + Some(value) => serde_json_bytes::from_value(value)?, None => body, }; @@ -378,7 +378,7 @@ where .transpose()?; let body_to_send = response_config .body - .then(|| serde_json::to_value(&first).expect("serialization will not fail")); + .then(|| serde_json_bytes::to_value(&first).expect("serialization will not fail")); let status_to_send = response_config.status_code.then(|| parts.status.as_u16()); let context_to_send = response_config.context.get_context(&response.context); let sdl_to_send = response_config.sdl.then(|| sdl.clone().to_string()); @@ -410,7 +410,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_body: graphql::Response = handle_graphql_response(first, co_processor_output.body)?; + let new_body = handle_graphql_response(first, co_processor_output.body)?; if let Some(control) = co_processor_output.control { parts.status = control.get_http_status()? @@ -456,7 +456,8 @@ where return Ok(deferred_response); } let body_to_send = response_config.body.then(|| { - serde_json::to_value(&deferred_response).expect("serialization will not fail") + serde_json_bytes::to_value(&deferred_response) + .expect("serialization will not fail") }); let context_to_send = response_config_context.get_context(&generator_map_context); @@ -489,7 +490,7 @@ where // that we replace "bits" of our incoming response with the updated bits if they // are present in our co_processor_output. If they aren't present, just use the // bits that we sent to the co_processor. - let new_deferred_response: graphql::Response = + let new_deferred_response = handle_graphql_response(deferred_response, co_processor_output.body)?; if let Some(context) = co_processor_output.context { @@ -539,12 +540,13 @@ mod tests { use futures::future::BoxFuture; use http::StatusCode; - use serde_json::json; + use serde_json_bytes::json; use tower::BoxError; use tower::ServiceExt; use super::super::*; use super::*; + use crate::json_ext::Object; use crate::plugin::test::MockInternalHttpClientService; use crate::plugin::test::MockSupergraphService; use crate::plugins::telemetry::config_new::conditions::SelectorOrValue; @@ -645,7 +647,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -715,7 +717,7 @@ mod tests { let request = supergraph::Request::fake_builder().build().unwrap(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -822,7 +824,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -895,7 +897,7 @@ mod tests { Ok(supergraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .build() .unwrap()) @@ -904,7 +906,7 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |mut res: http::Request| { Box::pin(async move { - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&router::body::into_bytes(&mut res).await.unwrap()) .unwrap(); @@ -1003,7 +1005,7 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), ); } @@ -1054,11 +1056,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let mut deserialized_response: Externalizable = serde_json::from_slice( + &router::body::into_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( PipelineStep::SupergraphResponse.to_string(), @@ -1078,9 +1079,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -1107,17 +1106,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3, "has_next": false }, "hasNext": false }), ); } @@ -1174,11 +1173,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let mut deserialized_response: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let mut deserialized_response: Externalizable = serde_json::from_slice( + &router::body::into_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( PipelineStep::SupergraphResponse.to_string(), @@ -1198,9 +1196,7 @@ mod tests { .unwrap() .insert( "has_next".to_string(), - serde_json::Value::from( - deserialized_response.has_next.unwrap_or_default(), - ), + Value::from(deserialized_response.has_next.unwrap_or_default()), ); Ok(http::Response::builder() @@ -1227,17 +1223,17 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 1, "has_next": true }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 2 }, "hasNext": true }), ); let body = res.response.body_mut().next().await.unwrap(); assert_eq!( - serde_json::to_value(&body).unwrap(), + serde_json_bytes::to_value(&body).unwrap(), json!({ "data": { "test": 3 }, "hasNext": false }), ); } diff --git a/apollo-router/src/plugins/coprocessor/test.rs b/apollo-router/src/plugins/coprocessor/test.rs index a5f7dcd8e3..1c6c4c2951 100644 --- a/apollo-router/src/plugins/coprocessor/test.rs +++ b/apollo-router/src/plugins/coprocessor/test.rs @@ -13,13 +13,14 @@ mod tests { use mime::APPLICATION_JSON; use mime::TEXT_HTML; use router::body::RouterBody; - use serde_json::json; - use serde_json_bytes::Value; + use serde_json_bytes::json; use services::subgraph::SubgraphRequestId; use tower::BoxError; use tower::ServiceExt; use super::super::*; + use crate::json_ext::Object; + use crate::json_ext::Value; use crate::plugin::test::MockInternalHttpClientService; use crate::plugin::test::MockRouterService; use crate::plugin::test::MockSubgraphService; @@ -36,7 +37,7 @@ mod tests { #[tokio::test] async fn load_plugin() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081" } @@ -54,7 +55,7 @@ mod tests { #[tokio::test] async fn unknown_fields_are_denied() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081", "thisFieldDoesntExist": true @@ -75,7 +76,7 @@ mod tests { #[tokio::test] async fn external_plugin_with_stages_wont_load_without_graph_ref() { - let config = json!({ + let config = serde_json::json!({ "coprocessor": { "url": "http://127.0.0.1:8081", "stages": { @@ -331,7 +332,7 @@ mod tests { let request = subgraph::Request::fake_builder().build(); assert_eq!( - "couldn't deserialize coprocessor output body: missing field `message`", + "couldn't deserialize coprocessor output body: GraphQL response was malformed: missing required `message` property within error", service .oneshot(request) .await @@ -394,7 +395,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -403,11 +404,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!( deserialized_request.subgraph_request_id.as_deref(), Some("5678") @@ -478,7 +478,7 @@ mod tests { assert_eq!("5678", &*response.id); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), response.response.into_body().data.unwrap() ); } @@ -535,7 +535,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -544,11 +544,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!( deserialized_request.subgraph_request_id.as_deref(), Some("5678") @@ -639,7 +638,7 @@ mod tests { assert_eq!("5678", &*response.id); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), response.response.into_body().data.unwrap() ); } @@ -701,7 +700,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -710,11 +709,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!( deserialized_request.subgraph_request_id.as_deref(), Some("5678") @@ -806,7 +804,7 @@ mod tests { assert_eq!("5678", &*response.id); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), response.response.into_body().data.unwrap() ); } @@ -841,7 +839,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .subgraph_name(String::default()) .build()) @@ -878,7 +876,7 @@ mod tests { let request = subgraph::Request::fake_builder().build(); assert_eq!( - serde_json_bytes::json!({ "test": 1234_u32 }), + json!({ "test": 1234_u32 }), service .oneshot(request) .await @@ -1004,7 +1002,7 @@ mod tests { assert_eq!( actual_response, - serde_json::from_value(json!({ + serde_json_bytes::from_value(json!({ "errors": [{ "message": "my error message", "extensions": { @@ -1038,7 +1036,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -1133,7 +1131,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -1163,7 +1161,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -1173,7 +1171,7 @@ mod tests { let mock_http_client = mock_with_callback(move |r: http::Request| { Box::pin(async move { let (_, body) = r.into_parts(); - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&router::body::into_bytes(body).await.unwrap()).unwrap(); assert_eq!( @@ -1282,7 +1280,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -1310,7 +1308,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .id(req.id) .subgraph_name(String::default()) @@ -1320,7 +1318,7 @@ mod tests { let mock_http_client = mock_with_callback(move |r: http::Request| { Box::pin(async move { let (_, body) = r.into_parts(); - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&router::body::into_bytes(body).await.unwrap()).unwrap(); assert_eq!( @@ -1438,7 +1436,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -1472,7 +1470,7 @@ mod tests { Ok(subgraph::Response::builder() .data(json!({ "test": 1234_u32 })) .errors(Vec::new()) - .extensions(crate::json_ext::Object::new()) + .extensions(Object::new()) .context(req.context) .subgraph_name(String::default()) .build()) @@ -1557,7 +1555,7 @@ mod tests { ); assert_eq!( - serde_json_bytes::json!({ "test": 5678_u32 }), + json!({ "test": 5678_u32 }), response.response.into_body().data.unwrap() ); } @@ -1659,7 +1657,7 @@ mod tests { mock_with_deferred_callback(move |req: http::Request| { Box::pin(async { let (_, body) = req.into_parts(); - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&router::body::into_bytes(body).await.unwrap()) .unwrap(); let context = deserialized_response.context.unwrap_or_default(); @@ -1763,7 +1761,7 @@ mod tests { mock_with_deferred_callback(move |req: http::Request| { Box::pin(async { let (_, body) = req.into_parts(); - let deserialized_response: Externalizable = + let deserialized_response: Externalizable = serde_json::from_slice(&router::body::into_bytes(body).await.unwrap()) .unwrap(); let context = deserialized_response.context.unwrap_or_default(); @@ -1877,11 +1875,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); assert_eq!( @@ -2002,11 +1999,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!( deserialized_request @@ -2157,11 +2153,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); assert_eq!( @@ -2285,11 +2280,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); assert_eq!( @@ -2386,11 +2380,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); assert_eq!( @@ -2445,7 +2438,7 @@ mod tests { assert_eq!("a value", value); - let actual_response = serde_json::from_slice::( + let actual_response = serde_json::from_slice::( &router::body::into_bytes(response.into_body()) .await .unwrap(), @@ -2481,11 +2474,10 @@ mod tests { let mock_http_client = mock_with_callback(move |req: http::Request| { Box::pin(async { - let deserialized_request: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(req.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_request: Externalizable = serde_json::from_slice( + &router::body::into_bytes(req.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_request.version); assert_eq!( @@ -2528,7 +2520,7 @@ mod tests { .response; assert_eq!(response.status(), http::StatusCode::UNAUTHORIZED); - let actual_response = serde_json::from_slice::( + let actual_response = serde_json::from_slice::( &router::body::into_bytes(response.into_body()) .await .unwrap(), @@ -2574,11 +2566,10 @@ mod tests { let mock_http_client = mock_with_deferred_callback(move |res: http::Request| { Box::pin(async { - let deserialized_response: Externalizable = - serde_json::from_slice( - &router::body::into_bytes(res.into_body()).await.unwrap(), - ) - .unwrap(); + let deserialized_response: Externalizable = serde_json::from_slice( + &router::body::into_bytes(res.into_body()).await.unwrap(), + ) + .unwrap(); assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); assert_eq!( @@ -2675,7 +2666,7 @@ mod tests { // the body should have changed: assert_eq!( json!({ "data": { "test": 42_u32 } }), - serde_json::from_slice::( + serde_json::from_slice::( &router::body::into_bytes(res.response.into_body()) .await .unwrap() diff --git a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs index f35c9d5094..1def4a8722 100644 --- a/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs +++ b/apollo-router/src/plugins/demand_control/cost_calculator/static_cost.rs @@ -819,7 +819,7 @@ mod tests { .as_object() .cloned() .unwrap_or_default(); - let response = Response::from_bytes("test", Bytes::from(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from(response_bytes)).unwrap(); let schema = DemandControlledSchema::new(Arc::new(schema.supergraph_schema().clone())).unwrap(); StaticCostCalculator::new(Arc::new(schema), Default::default(), 100) @@ -847,7 +847,7 @@ mod tests { .as_object() .cloned() .unwrap_or_default(); - let response = Response::from_bytes("test", Bytes::from(response_bytes)).unwrap(); + let response = Response::from_bytes(Bytes::from(response_bytes)).unwrap(); let schema = DemandControlledSchema::new(Arc::new(schema)).unwrap(); StaticCostCalculator::new(Arc::new(schema), Default::default(), 100) diff --git a/apollo-router/src/services/subgraph_service.rs b/apollo-router/src/services/subgraph_service.rs index d5283ae422..5ce4fba180 100644 --- a/apollo-router/src/services/subgraph_service.rs +++ b/apollo-router/src/services/subgraph_service.rs @@ -751,7 +751,11 @@ fn http_response_to_graphql_response( // Application json expects valid graphql response if 2xx tracing::debug_span!("parse_subgraph_response").in_scope(|| { // Application graphql json expects valid graphql response - graphql::Response::from_bytes(service_name, body).unwrap_or_else(|error| { + graphql::Response::from_bytes(body).unwrap_or_else(|error| { + let error = FetchError::SubrequestMalformedResponse { + service: service_name.to_owned(), + reason: error.reason, + }; graphql::Response::builder() .error(error.to_graphql_error(None)) .build() @@ -767,7 +771,7 @@ fn http_response_to_graphql_response( if original_response.is_empty() { original_response = "".into() } - graphql::Response::from_bytes(service_name, body).unwrap_or_else(|_error| { + graphql::Response::from_bytes(body).unwrap_or_else(|_error| { graphql::Response::builder() .error( FetchError::SubrequestMalformedResponse { diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index 353932ec22..79dc309d38 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -103,11 +103,11 @@ async fn test_coprocessor_response_handling() -> Result<(), BoxError> { test_full_pipeline(500, "RouterRequest", empty_body_object).await; test_full_pipeline(500, "RouterResponse", empty_body_object).await; test_full_pipeline(200, "SupergraphRequest", empty_body_object).await; - test_full_pipeline(200, "SupergraphResponse", empty_body_object).await; + test_full_pipeline(500, "SupergraphResponse", empty_body_object).await; test_full_pipeline(200, "SubgraphRequest", empty_body_object).await; test_full_pipeline(200, "SubgraphResponse", empty_body_object).await; test_full_pipeline(200, "ExecutionRequest", empty_body_object).await; - test_full_pipeline(200, "ExecutionResponse", empty_body_object).await; + test_full_pipeline(500, "ExecutionResponse", empty_body_object).await; test_full_pipeline(200, "RouterRequest", remove_body).await; test_full_pipeline(200, "RouterResponse", remove_body).await; @@ -130,20 +130,12 @@ async fn test_coprocessor_response_handling() -> Result<(), BoxError> { } fn empty_body_object(mut body: serde_json::Value) -> serde_json::Value { - *body - .as_object_mut() - .expect("body") - .get_mut("body") - .expect("body") = serde_json::Value::Object(serde_json::Map::new()); + *body.pointer_mut("/body").expect("body") = json!({}); body } fn empty_body_string(mut body: serde_json::Value) -> serde_json::Value { - *body - .as_object_mut() - .expect("body") - .get_mut("body") - .expect("body") = serde_json::Value::String("".to_string()); + *body.pointer_mut("/body").expect("body") = json!(""); body } @@ -153,7 +145,7 @@ fn remove_body(mut body: serde_json::Value) -> serde_json::Value { } fn null_out_response(_body: serde_json::Value) -> serde_json::Value { - serde_json::Value::String("".to_string()) + json!("") } async fn test_full_pipeline( @@ -274,3 +266,51 @@ async fn test_coprocessor_demand_control_access() -> Result<(), BoxError> { Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +async fn test_coprocessor_proxying_error_response() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + + let mock_coprocessor = wiremock::MockServer::start().await; + let coprocessor_address = mock_coprocessor.uri(); + + Mock::given(method("POST")) + .and(path("/")) + .respond_with(|req: &wiremock::Request| { + let body = req.body_json::().expect("body"); + ResponseTemplate::new(200).set_body_json(body) + }) + .mount(&mock_coprocessor) + .await; + + let mut router = IntegrationTest::builder() + .config( + include_str!("fixtures/coprocessor.router.yaml") + .replace("", &coprocessor_address), + ) + .responder(ResponseTemplate::new(200).set_body_json(json!({ + "errors": [{ "message": "subgraph error", "path": [] }], + "data": null + }))) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let (_trace_id, response) = router.execute_default_query().await; + assert_eq!(response.status(), 200); + assert_eq!( + response.json::().await?, + json!({ + "errors": [{ "message": "Subgraph errors redacted", "path": [] }], + "data": null + }) + ); + + router.graceful_shutdown().await; + + Ok(()) +}