Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions apollo-router/src/axum_factory/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,17 @@ mod tests {
.unwrap();

let endpoint = service_fn(|req: router::Request| async move {
Ok::<_, BoxError>(router::Response {
response: http::Response::builder()
.body::<crate::services::router::Body>(body::from_bytes(
"this is a test".to_string(),
))
Ok::<_, BoxError>(
router::Response::http_response_builder()
.response(
http::Response::builder().body::<crate::services::router::Body>(
body::from_bytes("this is a test".to_string()),
)?,
)
.context(req.context)
.build()
.unwrap(),
context: req.context,
})
)
})
.boxed();

Expand Down Expand Up @@ -591,14 +594,14 @@ mod tests {
.build()
.unwrap();
let endpoint = service_fn(|req: router::Request| async move {
Ok::<_, BoxError>(router::Response {
response: http::Response::builder()
.body::<crate::services::router::Body>(body::from_bytes(
"this is a test".to_string(),
))
.unwrap(),
context: req.context,
})
router::Response::http_response_builder()
.response(
http::Response::builder().body::<crate::services::router::Body>(
body::from_bytes("this is a test".to_string()),
)?,
)
.context(req.context)
.build()
})
.boxed();

Expand Down
116 changes: 68 additions & 48 deletions apollo-router/src/plugins/cache/invalidation_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::invalidation::Invalidation;
use super::invalidation::InvalidationOrigin;
use crate::ListenAddr;
use crate::configuration::subgraph::SubgraphConfiguration;
use crate::graphql;
use crate::plugins::cache::invalidation::InvalidationRequest;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
Expand Down Expand Up @@ -108,13 +109,16 @@ impl Service<router::Request> for InvalidationService {
let (parts, body) = req.router_request.into_parts();
if !parts.headers.contains_key(AUTHORIZATION) {
Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
return Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(router::body::from_bytes("Missing authorization header"))
.map_err(BoxError::from)?,
context: req.context,
});
return router::Response::error_builder()
.status_code(StatusCode::UNAUTHORIZED)
.error(
graphql::Error::builder()
.message(String::from("Missing authorization header"))
.extension_code(StatusCode::UNAUTHORIZED.to_string())
.build(),
)
.context(req.context)
.build();
}
match parts.method {
Method::POST => {
Expand Down Expand Up @@ -156,66 +160,82 @@ impl Service<router::Request> for InvalidationService {
if !valid_shared_key {
Span::current()
.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
return Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(router::body::from_bytes(
"Invalid authorization header",
))
.map_err(BoxError::from)?,
context: req.context,
});
return router::Response::error_builder()
.status_code(StatusCode::UNAUTHORIZED)
.error(
graphql::Error::builder()
.message("Invalid authorization header")
.extension_code(
StatusCode::UNAUTHORIZED.to_string(),
)
.build(),
)
.context(req.context)
.build();
}
match invalidation
.invalidate(InvalidationOrigin::Endpoint, body)
.instrument(tracing::info_span!("invalidate"))
.await
{
Ok(count) => Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::ACCEPTED)
.body(router::body::from_bytes(serde_json::to_string(
&json!({
"count": count
}),
)?))
.map_err(BoxError::from)?,
context: req.context,
}),
Ok(count) => router::Response::http_response_builder()
.response(
http::Response::builder()
.status(StatusCode::ACCEPTED)
.body(router::body::from_bytes(
serde_json::to_string(&json!({
"count": count
}))?,
))
.map_err(BoxError::from)?,
)
.context(req.context)
.build(),
Err(err) => {
Span::current()
.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(router::body::from_bytes(err.to_string()))
.map_err(BoxError::from)?,
context: req.context,
})
router::Response::error_builder()
.status_code(StatusCode::BAD_REQUEST)
.error(
graphql::Error::builder()
.message(err.to_string())
.extension_code(
StatusCode::BAD_REQUEST.to_string(),
)
.build(),
)
.context(req.context)
.build()
}
}
}
Err(err) => {
Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(router::body::from_bytes(err))
.map_err(BoxError::from)?,
context: req.context,
})
router::Response::error_builder()
.status_code(StatusCode::BAD_REQUEST)
.error(
graphql::Error::builder()
.message(err)
.extension_code(StatusCode::BAD_REQUEST.to_string())
.build(),
)
.context(req.context)
.build()
}
}
}
_ => {
Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
Ok(router::Response {
response: http::Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(router::body::from_bytes("".to_string()))
.map_err(BoxError::from)?,
context: req.context,
})
router::Response::error_builder()
.status_code(StatusCode::METHOD_NOT_ALLOWED)
.error(
graphql::Error::builder()
.message("".to_string())
.extension_code(StatusCode::METHOD_NOT_ALLOWED.to_string())
.build(),
)
.context(req.context)
.build()
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions apollo-router/src/plugins/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,11 +1104,11 @@ where
.map(|b| b.map(http_body::Frame::data).map_err(axum::Error::new)),
));

// Finally, return a response which has a Body that wraps our stream of response chunks.
Ok(router::Response {
context,
response: http::Response::from_parts(parts, final_stream),
})
// Finally, return a response which has a Body that wraps our stream of response chunks
router::Response::http_response_builder()
.context(context)
.response(http::Response::from_parts(parts, final_stream))
.build()
}
// -----------------------------------------------------------------------------------------------------

Expand Down
70 changes: 40 additions & 30 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,22 @@ impl PluginPrivate for FleetDetector {
context: req.context,
})
// Count the number of response bytes from the router to clients
.map_response(move |res: router::Response| router::Response {
response: res.response.map(move |body| {
router::body::from_result_stream(body.into_data_stream().inspect(|res| {
if let Ok(bytes) = res {
u64_counter!(
"apollo.router.operations.response_size",
"Total number of response bytes to clients",
bytes.len() as u64
);
}
.map_response(move |res: router::Response| {
router::Response::http_response_builder()
.response(res.response.map(move |body| {
router::body::from_result_stream(body.into_data_stream().inspect(|res| {
if let Ok(bytes) = res {
u64_counter!(
"apollo.router.operations.response_size",
"Total number of response bytes to clients",
bytes.len() as u64
);
}
}))
}))
}),
context: res.context,
.context(res.context)
.build()
.unwrap()
})
.boxed()
}
Expand Down Expand Up @@ -532,6 +535,7 @@ mod tests {
use tower::Service as _;

use super::*;
use crate::graphql;
use crate::metrics::FutureMetricsExt as _;
use crate::metrics::collect_metrics;
use crate::metrics::test_utils::MetricType;
Expand All @@ -551,15 +555,19 @@ mod tests {
.expect_call()
.times(1)
.returning(|req: router::Request| {
Ok(router::Response {
context: req.context,
response: http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
// making sure the request body is consumed
.body(req.router_request.into_body())
.unwrap(),
})
// making sure the request body is consumed
req.router_request.into_body();
router::Response::error_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
.error(
graphql::Error::builder()
.message("bad request")
.extension_code(StatusCode::BAD_REQUEST.to_string())
.build(),
)
.build()
});
let mut bad_request_router_service =
plugin.router_service(mock_bad_request_service.boxed());
Expand Down Expand Up @@ -609,15 +617,17 @@ mod tests {
.expect_call()
.times(1)
.returning(|req: router::Request| {
Ok(router::Response {
context: req.context,
response: http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
// making sure the request body is consumed
.body(req.router_request.into_body())
.unwrap(),
})
router::Response::http_response_builder()
.context(req.context)
.response(
http::Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
// making sure the request body is consumed
.body(req.router_request.into_body())
.unwrap(),
)
.build()
});
let mut bad_request_router_service =
plugin.router_service(mock_bad_request_service.boxed());
Expand Down
10 changes: 5 additions & 5 deletions apollo-router/src/plugins/healthcheck/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,14 @@ impl PluginPrivate for HealthCheck {
};
tracing::trace!(?health, request = ?req.router_request, "health check");
async move {
Ok(router::Response {
response: http::Response::builder().status(status_code).body(
router::Response::http_response_builder()
.response(http::Response::builder().status(status_code).body(
router::body::from_bytes(
serde_json::to_vec(&health).map_err(BoxError::from)?,
),
)?,
context: req.context,
})
)?)
.context(req.context)
.build()
}
})
.boxed(),
Expand Down
10 changes: 5 additions & 5 deletions apollo-router/src/plugins/record_replay/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ impl Plugin for Record {

let stream = stream.into_data_stream().chain(after_complete);

Ok(router::Response {
context: res.context,
response: http::Response::from_parts(
router::Response::http_response_builder()
.context(res.context)
.response(http::Response::from_parts(
parts,
router::body::from_result_stream(stream),
),
})
))
.build()
}
})
.service(service)
Expand Down
Loading