diff --git a/apollo-router/src/axum_factory/listeners.rs b/apollo-router/src/axum_factory/listeners.rs index e77ac0d20c..5cbe7891da 100644 --- a/apollo-router/src/axum_factory/listeners.rs +++ b/apollo-router/src/axum_factory/listeners.rs @@ -550,14 +550,17 @@ mod tests { .unwrap(); let endpoint = service_fn(|req: router::Request| async move { - Ok::<_, BoxError>(router::Response { - response: http::Response::builder() - .body::(body::from_bytes( - "this is a test".to_string(), - )) + Ok::<_, BoxError>( + router::Response::http_response_builder() + .response( + http::Response::builder().body::( + body::from_bytes("this is a test".to_string()), + )?, + ) + .context(req.context) + .build() .unwrap(), - context: req.context, - }) + ) }) .boxed(); @@ -591,14 +594,14 @@ mod tests { .build() .unwrap(); let endpoint = service_fn(|req: router::Request| async move { - Ok::<_, BoxError>(router::Response { - response: http::Response::builder() - .body::(body::from_bytes( - "this is a test".to_string(), - )) - .unwrap(), - context: req.context, - }) + router::Response::http_response_builder() + .response( + http::Response::builder().body::( + body::from_bytes("this is a test".to_string()), + )?, + ) + .context(req.context) + .build() }) .boxed(); diff --git a/apollo-router/src/plugins/cache/invalidation_endpoint.rs b/apollo-router/src/plugins/cache/invalidation_endpoint.rs index 77e76edec9..d08f5dcaa0 100644 --- a/apollo-router/src/plugins/cache/invalidation_endpoint.rs +++ b/apollo-router/src/plugins/cache/invalidation_endpoint.rs @@ -20,6 +20,7 @@ use super::invalidation::Invalidation; use super::invalidation::InvalidationOrigin; use crate::ListenAddr; use crate::configuration::subgraph::SubgraphConfiguration; +use crate::graphql; use crate::plugins::cache::invalidation::InvalidationRequest; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR; @@ -108,13 +109,16 @@ impl Service for InvalidationService { let (parts, body) = req.router_request.into_parts(); if !parts.headers.contains_key(AUTHORIZATION) { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes("Missing authorization header")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error( + graphql::Error::builder() + .message(String::from("Missing authorization header")) + .extension_code(StatusCode::UNAUTHORIZED.to_string()) + .build(), + ) + .context(req.context) + .build(); } match parts.method { Method::POST => { @@ -156,66 +160,82 @@ impl Service for InvalidationService { if !valid_shared_key { Span::current() .record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes( - "Invalid authorization header", - )) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error( + graphql::Error::builder() + .message("Invalid authorization header") + .extension_code( + StatusCode::UNAUTHORIZED.to_string(), + ) + .build(), + ) + .context(req.context) + .build(); } match invalidation .invalidate(InvalidationOrigin::Endpoint, body) .instrument(tracing::info_span!("invalidate")) .await { - Ok(count) => Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::ACCEPTED) - .body(router::body::from_bytes(serde_json::to_string( - &json!({ - "count": count - }), - )?)) - .map_err(BoxError::from)?, - context: req.context, - }), + Ok(count) => router::Response::http_response_builder() + .response( + http::Response::builder() + .status(StatusCode::ACCEPTED) + .body(router::body::from_bytes( + serde_json::to_string(&json!({ + "count": count + }))?, + )) + .map_err(BoxError::from)?, + ) + .context(req.context) + .build(), Err(err) => { Span::current() .record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes(err.to_string())) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error( + graphql::Error::builder() + .message(err.to_string()) + .extension_code( + StatusCode::BAD_REQUEST.to_string(), + ) + .build(), + ) + .context(req.context) + .build() } } } Err(err) => { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes(err)) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error( + graphql::Error::builder() + .message(err) + .extension_code(StatusCode::BAD_REQUEST.to_string()) + .build(), + ) + .context(req.context) + .build() } } } _ => { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .body(router::body::from_bytes("".to_string())) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::METHOD_NOT_ALLOWED) + .error( + graphql::Error::builder() + .message("".to_string()) + .extension_code(StatusCode::METHOD_NOT_ALLOWED.to_string()) + .build(), + ) + .context(req.context) + .build() } } } diff --git a/apollo-router/src/plugins/coprocessor/mod.rs b/apollo-router/src/plugins/coprocessor/mod.rs index 6d0b881324..d6b186caab 100644 --- a/apollo-router/src/plugins/coprocessor/mod.rs +++ b/apollo-router/src/plugins/coprocessor/mod.rs @@ -1104,11 +1104,11 @@ where .map(|b| b.map(http_body::Frame::data).map_err(axum::Error::new)), )); - // Finally, return a response which has a Body that wraps our stream of response chunks. - Ok(router::Response { - context, - response: http::Response::from_parts(parts, final_stream), - }) + // Finally, return a response which has a Body that wraps our stream of response chunks + router::Response::http_response_builder() + .context(context) + .response(http::Response::from_parts(parts, final_stream)) + .build() } // ----------------------------------------------------------------------------------------------------- diff --git a/apollo-router/src/plugins/fleet_detector.rs b/apollo-router/src/plugins/fleet_detector.rs index 30c41bc6ff..0d08ec83ac 100644 --- a/apollo-router/src/plugins/fleet_detector.rs +++ b/apollo-router/src/plugins/fleet_detector.rs @@ -270,19 +270,22 @@ impl PluginPrivate for FleetDetector { context: req.context, }) // Count the number of response bytes from the router to clients - .map_response(move |res: router::Response| router::Response { - response: res.response.map(move |body| { - router::body::from_result_stream(body.into_data_stream().inspect(|res| { - if let Ok(bytes) = res { - u64_counter!( - "apollo.router.operations.response_size", - "Total number of response bytes to clients", - bytes.len() as u64 - ); - } + .map_response(move |res: router::Response| { + router::Response::http_response_builder() + .response(res.response.map(move |body| { + router::body::from_result_stream(body.into_data_stream().inspect(|res| { + if let Ok(bytes) = res { + u64_counter!( + "apollo.router.operations.response_size", + "Total number of response bytes to clients", + bytes.len() as u64 + ); + } + })) })) - }), - context: res.context, + .context(res.context) + .build() + .unwrap() }) .boxed() } @@ -532,6 +535,7 @@ mod tests { use tower::Service as _; use super::*; + use crate::graphql; use crate::metrics::FutureMetricsExt as _; use crate::metrics::collect_metrics; use crate::metrics::test_utils::MetricType; @@ -551,15 +555,19 @@ mod tests { .expect_call() .times(1) .returning(|req: router::Request| { - Ok(router::Response { - context: req.context, - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .header("content-type", "application/json") - // making sure the request body is consumed - .body(req.router_request.into_body()) - .unwrap(), - }) + // making sure the request body is consumed + req.router_request.into_body(); + router::Response::error_builder() + .context(req.context) + .status_code(StatusCode::BAD_REQUEST) + .header("content-type", "application/json") + .error( + graphql::Error::builder() + .message("bad request") + .extension_code(StatusCode::BAD_REQUEST.to_string()) + .build(), + ) + .build() }); let mut bad_request_router_service = plugin.router_service(mock_bad_request_service.boxed()); @@ -609,15 +617,17 @@ mod tests { .expect_call() .times(1) .returning(|req: router::Request| { - Ok(router::Response { - context: req.context, - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .header("content-type", "application/json") - // making sure the request body is consumed - .body(req.router_request.into_body()) - .unwrap(), - }) + router::Response::http_response_builder() + .context(req.context) + .response( + http::Response::builder() + .status(StatusCode::BAD_REQUEST) + .header("content-type", "application/json") + // making sure the request body is consumed + .body(req.router_request.into_body()) + .unwrap(), + ) + .build() }); let mut bad_request_router_service = plugin.router_service(mock_bad_request_service.boxed()); diff --git a/apollo-router/src/plugins/healthcheck/mod.rs b/apollo-router/src/plugins/healthcheck/mod.rs index 6c399fe8ca..6645e3f75f 100644 --- a/apollo-router/src/plugins/healthcheck/mod.rs +++ b/apollo-router/src/plugins/healthcheck/mod.rs @@ -290,14 +290,14 @@ impl PluginPrivate for HealthCheck { }; tracing::trace!(?health, request = ?req.router_request, "health check"); async move { - Ok(router::Response { - response: http::Response::builder().status(status_code).body( + router::Response::http_response_builder() + .response(http::Response::builder().status(status_code).body( router::body::from_bytes( serde_json::to_vec(&health).map_err(BoxError::from)?, ), - )?, - context: req.context, - }) + )?) + .context(req.context) + .build() } }) .boxed(), diff --git a/apollo-router/src/plugins/record_replay/record.rs b/apollo-router/src/plugins/record_replay/record.rs index 0123be7229..dde27a641d 100644 --- a/apollo-router/src/plugins/record_replay/record.rs +++ b/apollo-router/src/plugins/record_replay/record.rs @@ -127,13 +127,13 @@ impl Plugin for Record { let stream = stream.into_data_stream().chain(after_complete); - Ok(router::Response { - context: res.context, - response: http::Response::from_parts( + router::Response::http_response_builder() + .context(res.context) + .response(http::Response::from_parts( parts, router::body::from_result_stream(stream), - ), - }) + )) + .build() } }) .service(service) diff --git a/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs b/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs index a92ff2e469..d861b4ec79 100644 --- a/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs +++ b/apollo-router/src/plugins/response_cache/invalidation_endpoint.rs @@ -20,6 +20,7 @@ use super::invalidation::InvalidationOrigin; use super::plugin::Subgraph; use crate::ListenAddr; use crate::configuration::subgraph::SubgraphConfiguration; +use crate::graphql; use crate::plugins::response_cache::invalidation::InvalidationRequest; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE; use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR; @@ -108,13 +109,16 @@ impl Service for InvalidationService { let (parts, body) = req.router_request.into_parts(); if !parts.headers.contains_key(AUTHORIZATION) { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes("Missing authorization header")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error( + graphql::Error::builder() + .message(String::from("Missing authorization header")) + .extension_code(StatusCode::UNAUTHORIZED.to_string()) + .build(), + ) + .context(req.context) + .build(); } match parts.method { Method::POST => { @@ -156,66 +160,84 @@ impl Service for InvalidationService { if !valid_shared_key { Span::current() .record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes( - "Invalid authorization header", - )) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error( + graphql::Error::builder() + .message(String::from( + "Invalid authorization header", + )) + .extension_code( + StatusCode::UNAUTHORIZED.to_string(), + ) + .build(), + ) + .context(req.context) + .build(); } match invalidation .invalidate(InvalidationOrigin::Endpoint, body) .instrument(tracing::info_span!("invalidate")) .await { - Ok(count) => Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::ACCEPTED) - .body(router::body::from_bytes(serde_json::to_string( - &json!({ - "count": count - }), - )?)) - .map_err(BoxError::from)?, - context: req.context, - }), + Ok(count) => router::Response::http_response_builder() + .response( + http::Response::builder() + .status(StatusCode::ACCEPTED) + .body(router::body::from_bytes( + serde_json::to_string(&json!({ + "count": count + }))?, + )) + .map_err(BoxError::from)?, + ) + .context(req.context) + .build(), Err(err) => { Span::current() .record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes(err.to_string())) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error( + graphql::Error::builder() + .message(err.to_string()) + .extension_code( + StatusCode::BAD_REQUEST.to_string(), + ) + .build(), + ) + .context(req.context) + .build() } } } Err(err) => { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes(err)) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error( + graphql::Error::builder() + .message(err) + .extension_code(StatusCode::BAD_REQUEST.to_string()) + .build(), + ) + .context(req.context) + .build() } } } _ => { Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .body(router::body::from_bytes("".to_string())) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::METHOD_NOT_ALLOWED) + .error( + graphql::Error::builder() + .message("".to_string()) + .extension_code(StatusCode::METHOD_NOT_ALLOWED.to_string()) + .build(), + ) + .context(req.context) + .build() } } } diff --git a/apollo-router/src/plugins/subscription.rs b/apollo-router/src/plugins/subscription.rs index e5865364ae..6d008b81b5 100644 --- a/apollo-router/src/plugins/subscription.rs +++ b/apollo-router/src/plugins/subscription.rs @@ -465,13 +465,15 @@ impl Service for CallbackService { let cb_body = match cb_body { Ok(cb_body) => cb_body, Err(err) => { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes(err)) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error(graphql::Error::builder() + .message(err) + .extension_code(StatusCode::BAD_REQUEST.to_string()) + .build() + ) + .context(req.context) + .build(); } }; let id = cb_body.id().clone(); @@ -492,13 +494,15 @@ impl Service for CallbackService { let expected_hashed_verifier = verifier_hasher.finalize(); if hashed_verifier != expected_hashed_verifier { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes("verifier doesn't match")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error(graphql::Error::builder() + .message("verifier doesn't match") + .extension_code(StatusCode::UNAUTHORIZED.to_string()) + .build() + ) + .context(req.context) + .build(); } if let Err(res) = ensure_id_consistency(&req.context, &sub_id, &id) { @@ -513,13 +517,15 @@ impl Service for CallbackService { let mut handle = match notify.subscribe_if_exist(id).await? { Some(handle) => handle.into_sink(), None => { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes("suscription doesn't exist")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message("subscription doesn't exist") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build(); } }; // Keep the subscription to the client opened @@ -532,35 +538,35 @@ impl Service for CallbackService { ); handle.send_sync(*payload)?; - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::OK) - .body(router::body::empty()) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::builder() + .context(req.context) + .build() } CallbackPayload::Subscription(SubscriptionPayload::Check { .. }) => { if notify.exist(id).await? { - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NO_CONTENT) - .header(HeaderName::from_static(CALLBACK_SUBSCRIPTION_HEADER_NAME), HeaderValue::from_static(CALLBACK_SUBSCRIPTION_HEADER_VALUE)) - .body(router::body::empty()) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::NO_CONTENT) + .header(HeaderName::from_static(CALLBACK_SUBSCRIPTION_HEADER_NAME), HeaderValue::from_static(CALLBACK_SUBSCRIPTION_HEADER_VALUE)) + .error(graphql::Error::builder() + .message(String::default()) + .extension_code(StatusCode::NO_CONTENT.to_string()) + .build() + ) + .context(req.context) + .build() } else { - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .header(HeaderName::from_static(CALLBACK_SUBSCRIPTION_HEADER_NAME), HeaderValue::from_static(CALLBACK_SUBSCRIPTION_HEADER_VALUE)) - .body(router::body::from_bytes("suscription doesn't exist")) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .header(HeaderName::from_static(CALLBACK_SUBSCRIPTION_HEADER_NAME), HeaderValue::from_static(CALLBACK_SUBSCRIPTION_HEADER_VALUE)) + .error(graphql::Error::builder() + .message("subscription doesn't exist") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build() } } CallbackPayload::Subscription(SubscriptionPayload::Heartbeat { @@ -569,32 +575,38 @@ impl Service for CallbackService { verifier, }) => { if !ids.contains(&id) { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(router::body::from_bytes("id used for the verifier is not part of ids array")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::UNAUTHORIZED) + .error(graphql::Error::builder() + .message("id used for the verifier is not part of ids array") + .extension_code(StatusCode::UNAUTHORIZED.to_string()) + .build() + ) + .context(req.context) + .build() } let (mut valid_ids, invalid_ids) = notify.invalid_ids(ids).await?; if invalid_ids.is_empty() { - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NO_CONTENT) - .body(router::body::empty()) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::NO_CONTENT) + .error(graphql::Error::builder() + .message(String::default()) + .extension_code(StatusCode::NO_CONTENT.to_string()) + .build() + ) + .context(req.context) + .build() } else if valid_ids.is_empty() { - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes("suscriptions don't exist")) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message("subscriptions don't exist") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build() } else { let (id, verifier) = if invalid_ids.contains(&id) { (id, verifier) @@ -610,19 +622,19 @@ impl Service for CallbackService { (new_id, verifier) }; - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes( - serde_json::to_string_pretty(&InvalidIdsPayload{ - invalid_ids, - id, - verifier, - })?, - )) - .map_err(BoxError::from)?, - context: req.context, - }) + router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message(serde_json::to_string_pretty(&InvalidIdsPayload{ + invalid_ids, + id, + verifier, + }).map_err(BoxError::from)?) + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build() } } CallbackPayload::Subscription(SubscriptionPayload::Complete { @@ -633,22 +645,26 @@ impl Service for CallbackService { let mut handle = match notify.subscribe(id.clone()).await { Ok(handle) => handle.into_sink(), Err(NotifyError::UnknownTopic) => { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes("unknown topic")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message("unknown topic") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build(); }, Err(err) => { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes(err.to_string())) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message(err.to_string()) + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build(); } }; u64_counter!( @@ -661,41 +677,48 @@ impl Service for CallbackService { if let Err(_err) = handle.send_sync( graphql::Response::builder().errors(errors).build(), ) { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes("cannot send errors to the client")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message("cannot send errors to the client") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build(); } } if let Err(_err) = notify.force_delete(id).await { - return Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::NOT_FOUND) - .body(router::body::from_bytes("cannot force delete")) - .map_err(BoxError::from)?, - context: req.context, - }); + return router::Response::error_builder() + .status_code(StatusCode::NOT_FOUND) + .error(graphql::Error::builder() + .message("cannot force delete") + .extension_code(StatusCode::NOT_FOUND.to_string()) + .build() + ) + .context(req.context) + .build(); } - Ok(router::Response { - response: http::Response::builder() + + router::Response::http_response_builder() + .response(http::Response::builder() .status(StatusCode::ACCEPTED) .body(router::body::empty()) - .map_err(BoxError::from)?, - context: req.context, - }) + .map_err(BoxError::from)?) + .context(req.context) + .build() } } } - _ => Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .body(router::body::empty()) - .map_err(BoxError::from)?, - context: req.context, - }), + _ => router::Response::error_builder() + .status_code(StatusCode::METHOD_NOT_ALLOWED) + .error(graphql::Error::builder() + .message(String::default()) + .extension_code(StatusCode::METHOD_NOT_ALLOWED.to_string()) + .build() + ) + .context(req.context) + .build() } } .instrument(tracing::info_span!("subscription_callback")), @@ -722,15 +745,17 @@ fn ensure_id_consistency( id_from_body: &str, ) -> Result<(), router::Response> { if id_from_path != id_from_body { - Err(router::Response { - response: http::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(router::body::from_bytes( - "id from url path and id from body are different", - )) - .expect("this body is valid"), - context: context.clone(), - }) + Err(router::Response::error_builder() + .status_code(StatusCode::BAD_REQUEST) + .error( + graphql::Error::builder() + .message("id from url path and id from body are different") + .extension_code(StatusCode::BAD_REQUEST.to_string()) + .build(), + ) + .context(context.clone()) + .build() + .expect("this response is valid")) } else { Ok(()) } diff --git a/apollo-router/src/plugins/telemetry/metrics/prometheus.rs b/apollo-router/src/plugins/telemetry/metrics/prometheus.rs index d0521afd76..dd11211962 100644 --- a/apollo-router/src/plugins/telemetry/metrics/prometheus.rs +++ b/apollo-router/src/plugins/telemetry/metrics/prometheus.rs @@ -210,14 +210,17 @@ impl Service for PrometheusService { // Let's remove any problems they may have created for us. let stats = String::from_utf8_lossy(&result); let modified_stats = stats.replace("_total_total", "_total"); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4") - .body(router::body::from_bytes(modified_stats)) - .map_err(BoxError::from)?, - context: req.context, - }) + + router::Response::http_response_builder() + .response( + http::Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(router::body::from_bytes(modified_stats)) + .map_err(BoxError::from)?, + ) + .context(req.context) + .build() }) } } diff --git a/apollo-router/src/services/layers/static_page.rs b/apollo-router/src/services/layers/static_page.rs index a6ec6e91a5..e530554483 100644 --- a/apollo-router/src/services/layers/static_page.rs +++ b/apollo-router/src/services/layers/static_page.rs @@ -59,17 +59,23 @@ where let res = if req.router_request.method() == Method::GET && accepts_html(req.router_request.headers()) { - let response = http::Response::builder() - .header( - CONTENT_TYPE, - HeaderValue::from_static(mime::TEXT_HTML_UTF_8.as_ref()), - ) - .body(router::body::from_bytes(page.clone())) - .unwrap(); - ControlFlow::Break(router::Response { - response, - context: req.context, - }) + ControlFlow::Break( + router::Response::http_response_builder() + .response( + http::Response::builder() + .header( + CONTENT_TYPE, + HeaderValue::from_static( + mime::TEXT_HTML_UTF_8.as_ref(), + ), + ) + .body(router::body::from_bytes(page.clone())) + .unwrap(), + ) + .context(req.context) + .build() + .unwrap(), + ) } else { ControlFlow::Continue(req) }; diff --git a/apollo-router/src/services/router.rs b/apollo-router/src/services/router.rs index 3b6aee6131..05c5f3bb3f 100644 --- a/apollo-router/src/services/router.rs +++ b/apollo-router/src/services/router.rs @@ -269,6 +269,19 @@ impl Response { Ok(response) } + #[builder(visibility = "pub")] + fn http_response_new( + response: http::Response, + context: Context, + body_to_stash: Option, + ) -> Result { + let mut res = Self { response, context }; + if let Some(body_to_stash) = body_to_stash { + res.stash_the_body_in_extensions(body_to_stash) + } + Ok(res) + } + /// This is the constructor (or builder) to use when constructing a Response that represents a global error. /// It has no path and no response data. /// This is useful for things such as authentication errors. diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index f59088b344..30685d10c9 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -23,6 +23,7 @@ use mime::APPLICATION_JSON; use multimap::MultiMap; use opentelemetry::KeyValue; use opentelemetry_semantic_conventions::trace::HTTP_REQUEST_METHOD; +use serde_json_bytes::Value; use tower::BoxError; use tower::ServiceBuilder; use tower::ServiceExt; @@ -43,7 +44,6 @@ use crate::configuration::Batching; use crate::configuration::BatchingMode; use crate::graphql; use crate::http_ext; -use crate::json_ext::Value; use crate::layers::DEFAULT_BUFFER_SIZE; use crate::layers::ServiceBuilderExt; use crate::metrics::count_operation_error_codes; @@ -300,15 +300,19 @@ impl RouterService { match body.next().await { None => { tracing::error!("router service is not available to process request",); - Ok(router::Response { - response: http::Response::builder() - .status(StatusCode::SERVICE_UNAVAILABLE) - .body(router::body::from_bytes( - "router service is not available to process request", - )) - .expect("cannot fail"), - context, - }) + router::Response::error_builder() + .error( + graphql::Error::builder() + .message(String::from( + "router service is not available to process request", + )) + .extension_code(StatusCode::SERVICE_UNAVAILABLE.to_string()) + .build(), + ) + .status_code(StatusCode::SERVICE_UNAVAILABLE) + .header(CONTENT_TYPE, APPLICATION_JSON.essence_str()) + .context(context) + .build() } Some(response) => { if !response.has_next.unwrap_or(false) @@ -331,7 +335,6 @@ impl RouterService { &self.apollo_telemetry_config.errors, ); } - let body: Result = tracing::trace_span!("serialize_response") .in_scope(|| { let body = serde_json::to_string(&response)?; @@ -343,19 +346,18 @@ impl RouterService { .extensions() .with_lock(|ext| ext.get::().is_some()); - let mut res = router::Response { - response: Response::from_parts( + router::Response::http_response_builder() + .response(Response::from_parts( parts, router::body::from_bytes(body.clone()), - ), - context, - }; - - if display_router_response { - res.stash_the_body_in_extensions(body); - } - - Ok(res) + )) + .and_body_to_stash(if display_router_response { + Some(body) + } else { + None + }) + .context(context) + .build() } else if accepts_multipart_defer || accepts_multipart_subscription { if !response.errors.is_empty() { count_operation_errors( @@ -364,7 +366,6 @@ impl RouterService { &self.apollo_telemetry_config.errors, ); } - // Useful when you're using a proxy like nginx which enable proxy_buffering by default (http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering) parts.headers.insert( ACCEL_BUFFERING_HEADER_NAME.clone(), @@ -390,12 +391,13 @@ impl RouterService { } }; - let response = http::Response::from_parts( - parts, - router::body::from_result_stream(response_multipart), - ); - - Ok(RouterResponse { response, context }) + RouterResponse::http_response_builder() + .response(http::Response::from_parts( + parts, + router::body::from_result_stream(response_multipart), + )) + .context(context) + .build() } else { count_operation_error_codes( &["INVALID_ACCEPT_HEADER"], @@ -501,13 +503,13 @@ impl RouterService { } bytes.put_u8(b']'); - Ok(RouterResponse { - response: http::Response::from_parts( + RouterResponse::http_response_builder() + .response(http::Response::from_parts( parts, router::body::from_bytes(bytes.freeze()), - ), - context, - }) + )) + .context(context) + .build() } else { Ok(results.pop().expect("we should have at least one response")) }