Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changesets/fix_caroline_fix_rate_limit_status_code.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Raise 429 rather than 503 when enforcing rate-limit ([PR #8765](https://github.com/apollographql/router/pull/8765))

In the release of router 2.0, the rate-limiting error raised was changed from 429 (`TOO_MANY_REQUESTS`) to 503 (
`SERVICE_UNAVAILABLE`). This change reverts that modification to align with the
router [documentation](https://www.apollographql.com/docs/graphos/routing/errors#429).

By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/8765
179 changes: 104 additions & 75 deletions apollo-router/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,29 +262,22 @@ impl PluginPrivate for TrafficShaping {
}

fn router_service(&self, service: router::BoxService) -> router::BoxService {
// NB: consider each triplet (map_future_with_request_data, load_shed, layer) as a unit of
// behavior
ServiceBuilder::new()
.map_future_with_request_data(
|req: &router::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<RouterResponse, BoxError> = future.await;
match response {
Ok(ok) => Ok(ok),
Err(err) if err.is::<Elapsed>() => {
// TODO add metrics
let error = graphql::Error::builder()
.message("Your request has been timed out")
.extension_code("GATEWAY_TIMEOUT")
.build();
Ok(RouterResponse::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error(error)
.context(ctx)
.build()
.expect("should build overloaded response"))
}
Err(err) => Err(err),
}
move |ctx, future| async {
let response: Result<RouterResponse, BoxError> = future.await;
if matches!(response, Err(ref err) if err.is::<Elapsed>()) {
Ok(RouterResponse::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.error(gateway_timeout_error())
.context(ctx)
.build()
.expect("should build overloaded response"))
} else {
response
}
},
)
Expand All @@ -298,26 +291,17 @@ impl PluginPrivate for TrafficShaping {
))
.map_future_with_request_data(
|req: &router::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<RouterResponse, BoxError> = future.await;
match response {
Ok(ok) => Ok(ok),
Err(err) if err.is::<Overloaded>() => {
// TODO add metrics
let error = graphql::Error::builder()
.message("Your request has been concurrency limited")
.extension_code("REQUEST_CONCURRENCY_LIMITED")
.build();
Ok(RouterResponse::error_builder()
.status_code(StatusCode::SERVICE_UNAVAILABLE)
.error(error)
.context(ctx)
.build()
.expect("should build overloaded response"))
}
Err(err) => Err(err),
}
move |ctx, future| async {
let response: Result<RouterResponse, BoxError> = future.await;
if matches!(response, Err(ref err) if err.is::<Overloaded>()) {
Ok(RouterResponse::error_builder()
.status_code(StatusCode::SERVICE_UNAVAILABLE)
.error(concurrency_limit_error())
.context(ctx)
.build()
.expect("should build overloaded response"))
} else {
response
}
},
)
Expand All @@ -330,26 +314,17 @@ impl PluginPrivate for TrafficShaping {
}))
.map_future_with_request_data(
|req: &router::Request| req.context.clone(),
move |ctx, future| {
async {
let response: Result<RouterResponse, BoxError> = future.await;
match response {
Ok(ok) => Ok(ok),
Err(err) if err.is::<Overloaded>() => {
// TODO add metrics
let error = graphql::Error::builder()
.message("Your request has been rate limited")
.extension_code("REQUEST_RATE_LIMITED")
.build();
Ok(RouterResponse::error_builder()
.status_code(StatusCode::SERVICE_UNAVAILABLE)
.error(error)
.context(ctx)
.build()
.expect("should build overloaded response"))
}
Err(err) => Err(err),
}
move |ctx, future| async {
let response: Result<RouterResponse, BoxError> = future.await;
if matches!(response, Err(ref err) if err.is::<Overloaded>()) {
Ok(RouterResponse::error_builder()
.status_code(StatusCode::TOO_MANY_REQUESTS)
.error(rate_limit_error())
.context(ctx)
.build()
.expect("should build overloaded response"))
} else {
response
}
},
)
Expand Down Expand Up @@ -395,34 +370,25 @@ impl PluginPrivate for TrafficShaping {
async {
let response: Result<SubgraphResponse, BoxError> = future.await;
match response {
Ok(ok) => Ok(ok),
Err(err) if err.is::<Elapsed>() => {
// TODO add metrics
let error = graphql::Error::builder()
.message("Your request has been timed out")
.extension_code("GATEWAY_TIMEOUT")
.build();
Ok(SubgraphResponse::error_builder()
.status_code(StatusCode::GATEWAY_TIMEOUT)
.subgraph_name(subgraph_name)
.error(error)
.error(gateway_timeout_error())
.context(ctx)
.build())
}
Err(err) if err.is::<Overloaded>() => {
// TODO add metrics
let error = graphql::Error::builder()
.message("Your request has been rate limited")
.extension_code("REQUEST_RATE_LIMITED")
.build();
Ok(SubgraphResponse::error_builder()
.status_code(StatusCode::SERVICE_UNAVAILABLE)
.status_code(StatusCode::TOO_MANY_REQUESTS)
.subgraph_name(subgraph_name)
.error(error)
.error(rate_limit_error())
.context(ctx)
.build())
}
Err(err) => Err(err),
_ => response
}
}
},
Expand Down Expand Up @@ -566,6 +532,27 @@ impl TrafficShaping {
}
}

fn concurrency_limit_error() -> graphql::Error {
graphql::Error::builder()
.message("Your request has been concurrency limited")
.extension_code("REQUEST_CONCURRENCY_LIMITED")
.build()
}

fn gateway_timeout_error() -> graphql::Error {
graphql::Error::builder()
.message("Your request has been timed out")
.extension_code("GATEWAY_TIMEOUT")
.build()
}

fn rate_limit_error() -> graphql::Error {
graphql::Error::builder()
.message("Your request has been rate limited")
.extension_code("REQUEST_RATE_LIMITED")
.build()
}

register_private_plugin!("apollo", "traffic_shaping", TrafficShaping);

#[cfg(test)]
Expand All @@ -588,6 +575,8 @@ mod test {
use serde_json_bytes::ByteString;
use serde_json_bytes::Value;
use serde_json_bytes::json;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tower::Service;

use super::*;
Expand Down Expand Up @@ -1066,7 +1055,7 @@ mod test {
.await
.expect("it responded");

assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.response.status());
assert_eq!(StatusCode::TOO_MANY_REQUESTS, response.response.status());

tokio::time::sleep(Duration::from_millis(300)).await;

Expand Down Expand Up @@ -1197,7 +1186,7 @@ mod test {
.call(RouterRequest::fake_builder().build().unwrap())
.await
.unwrap();
assert_eq!(StatusCode::SERVICE_UNAVAILABLE, response.response.status());
assert_eq!(StatusCode::TOO_MANY_REQUESTS, response.response.status());
let j: serde_json::Value = serde_json::from_slice(
&router::body::into_bytes(response.response)
.await
Expand Down Expand Up @@ -1260,4 +1249,44 @@ mod test {
.expect("our body is valid json");
assert_eq!("Your request has been timed out", j["errors"][0]["message"]);
}

#[tokio::test(flavor = "multi_thread")]
async fn it_raises_different_errors_for_timeouts_and_rate_limits() {
// expected behavior: the first request sent will timeout, the second request will return
// immediately due to the ratelimit
let config: serde_json::Value = serde_yaml::from_str(
r#"
router:
global_rate_limit:
capacity: 1
interval: 100ms
timeout: 150ms
"#,
)
.unwrap();

let plugin = get_traffic_shaping_plugin(&config).await;
let svc = ServiceBuilder::new()
.service_fn(move |_req: router::Request| async {
sleep(Duration::from_millis(500)).await;
RouterResponse::fake_builder().build()
})
.boxed();

let mut router_service = plugin.router_service(svc);

let mut tasks = JoinSet::new();
for _ in 0..2 {
let request = RouterRequest::fake_builder().build().unwrap();
tasks.spawn(router_service.ready().await.unwrap().call(request));
}

let mut results = tasks.join_all().await.into_iter();

let response = results.next().unwrap().unwrap().response;
assert_eq!(StatusCode::TOO_MANY_REQUESTS, response.status());

let response = results.next().unwrap().unwrap().response;
assert_eq!(StatusCode::GATEWAY_TIMEOUT, response.status());
}
}
4 changes: 2 additions & 2 deletions apollo-router/tests/integration/traffic_shaping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ async fn test_router_rate_limit() -> Result<(), BoxError> {
assert_yaml_snapshot!(response);

let (_, response) = router.execute_default_query().await;
assert_eq!(response.status(), 503);
assert_eq!(response.status(), 429);
let response = response.text().await?;
assert!(response.contains("REQUEST_RATE_LIMITED"));
assert_yaml_snapshot!(response);

router.assert_metrics_contains(r#"http_server_request_duration_seconds_count{error_type="Service Unavailable",http_request_method="POST",http_response_status_code="503""#, None).await;
router.assert_metrics_contains(r#"http_server_request_duration_seconds_count{error_type="Too Many Requests",http_request_method="POST",http_response_status_code="429""#, None).await;

router.graceful_shutdown().await;
Ok(())
Expand Down