Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ There are situations where comments and whitespace are not preserved. This may b

By [@bryncooke](https://github.com/bryncooke) in https://github.com/apollographql/router/pull/2116

### *Experimental* subgraph request retry ([Issue #338](https://github.com/apollographql/router/issues/338), [Issue #1956](https://github.com/apollographql/router/issues/1956))

Implements subgraph request retries, using Finagle's retry buckets algorithm:
- it defines a minimal number of retries per second (`min_per_sec`, default is 10 retries per second), to
bootstrap the system or for low traffic deployments
- for each successful request, we add a "token" to the bucket, those tokens expire after `ttl` (default: 10 seconds)
- the number of available additional retries is a part of the number of tokens, defined by `retry_percent` (default is 0.2)

This is activated in the `traffic_shaping` plugin, either globally or per subgraph:

```yaml
traffic_shaping:
all:
experimental_retry:
min_per_sec: 10
ttl: 10s
retry_percent: 0.2
subgraphs:
accounts:
experimental_retry:
min_per_sec: 20
```

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2006

## 🐛 Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,31 @@ expression: "&schema"
"type": "boolean",
"nullable": true
},
"experimental_retry": {
"type": "object",
"properties": {
"min_per_sec": {
"description": "minimum rate of retries allowed to accomodate clients that have just started issuing requests, or clients that do not issue many requests per window. The default value is 10",
"type": "integer",
"format": "uint32",
"minimum": 0.0,
"nullable": true
},
"retry_percent": {
"description": "percentage of calls to deposit that can be retried. This is in addition to any retries allowed for via min_per_sec. Must be between 0 and 1000, default value is 0.2",
"type": "number",
"format": "float",
"nullable": true
},
"ttl": {
"description": "how long a single deposit should be considered. Must be between 1 and 60 seconds, default value is 10 seconds",
"default": null,
"type": "string"
}
},
"additionalProperties": false,
"nullable": true
},
"global_rate_limit": {
"description": "Enable global rate limiting",
"type": "object",
Expand Down Expand Up @@ -2189,6 +2214,31 @@ expression: "&schema"
"type": "boolean",
"nullable": true
},
"experimental_retry": {
"type": "object",
"properties": {
"min_per_sec": {
"description": "minimum rate of retries allowed to accomodate clients that have just started issuing requests, or clients that do not issue many requests per window. The default value is 10",
"type": "integer",
"format": "uint32",
"minimum": 0.0,
"nullable": true
},
"retry_percent": {
"description": "percentage of calls to deposit that can be retried. This is in addition to any retries allowed for via min_per_sec. Must be between 0 and 1000, default value is 0.2",
"type": "number",
"format": "float",
"nullable": true
},
"ttl": {
"description": "how long a single deposit should be considered. Must be between 1 and 60 seconds, default value is 10 seconds",
"default": null,
"type": "string"
}
},
"additionalProperties": false,
"nullable": true
},
"global_rate_limit": {
"description": "Enable global rate limiting",
"type": "object",
Expand Down
92 changes: 66 additions & 26 deletions apollo-router/src/plugins/traffic_shaping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@

mod deduplication;
mod rate;
mod retry;
mod timeout;

use std::collections::HashMap;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Mutex;
use std::time::Duration;

use futures::future::BoxFuture;
use http::header::ACCEPT_ENCODING;
use http::header::CONTENT_ENCODING;
use http::HeaderValue;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::retry::Retry;
use tower::util::Either;
use tower::util::Oneshot;
use tower::BoxError;
Expand All @@ -32,6 +34,7 @@ use tower::ServiceExt;
use self::deduplication::QueryDeduplicationLayer;
use self::rate::RateLimitLayer;
pub(crate) use self::rate::RateLimited;
use self::retry::RetryPolicy;
pub(crate) use self::timeout::Elapsed;
use self::timeout::TimeoutLayer;
use crate::error::ConfigurationError;
Expand Down Expand Up @@ -64,6 +67,8 @@ struct Shaping {
#[schemars(with = "String", default)]
/// Enable timeout for incoming requests
timeout: Option<Duration>,
// *experimental feature*: Enables request retry
experimental_retry: Option<RetryConfig>,
}

impl Merge for Shaping {
Expand All @@ -79,6 +84,42 @@ impl Merge for Shaping {
.as_ref()
.or(fallback.global_rate_limit.as_ref())
.cloned(),
experimental_retry: self
.experimental_retry
.as_ref()
.or(fallback.experimental_retry.as_ref())
.cloned(),
},
}
}
}

#[derive(PartialEq, Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
struct RetryConfig {
#[serde(deserialize_with = "humantime_serde::deserialize", default)]
#[schemars(with = "String", default)]
/// how long a single deposit should be considered. Must be between 1 and 60 seconds,
/// default value is 10 seconds
ttl: Option<Duration>,
/// minimum rate of retries allowed to accomodate clients that have just started
/// issuing requests, or clients that do not issue many requests per window. The
/// default value is 10
min_per_sec: Option<u32>,
/// percentage of calls to deposit that can be retried. This is in addition to any
/// retries allowed for via min_per_sec. Must be between 0 and 1000, default value
/// is 0.2
retry_percent: Option<f32>,
}

impl Merge for RetryConfig {
fn merge(&self, fallback: Option<&Self>) -> Self {
match fallback {
None => self.clone(),
Some(fallback) => RetryConfig {
ttl: self.ttl.or(fallback.ttl),
min_per_sec: self.min_per_sec.or(fallback.min_per_sec),
retry_percent: self.retry_percent.or(fallback.retry_percent),
},
}
}
Expand Down Expand Up @@ -234,24 +275,15 @@ impl TrafficShaping {
Error = BoxError,
Future = tower::util::Either<
tower::util::Either<
Pin<
Box<
(dyn futures::Future<
Output = std::result::Result<
subgraph::Response,
Box<
(dyn std::error::Error
+ std::marker::Send
+ std::marker::Sync
+ 'static),
>,
>,
> + std::marker::Send
+ 'static),
>,
>,
BoxFuture<'static, Result<subgraph::Response, BoxError>>,
timeout::future::ResponseFuture<
Oneshot<tower::util::Either<rate::service::RateLimit<S>, S>, subgraph::Request>,
Oneshot<
tower::util::Either<
Retry<RetryPolicy, tower::util::Either<rate::service::RateLimit<S>, S>>,
tower::util::Either<rate::service::RateLimit<S>, S>,
>,
subgraph::Request,
>,
>,
>,
<S as Service<subgraph::Request>>::Future,
Expand Down Expand Up @@ -284,16 +316,24 @@ impl TrafficShaping {
})
.clone()
});

let retry = config.experimental_retry.as_ref().map(|config| {
let retry_policy =
RetryPolicy::new(config.ttl, config.min_per_sec, config.retry_percent);
tower::retry::RetryLayer::new(retry_policy)
});

Either::A(ServiceBuilder::new()
.option_layer(config.deduplicate_query.unwrap_or_default().then(
QueryDeduplicationLayer::default
))
.layer(TimeoutLayer::new(
config
.timeout
.unwrap_or(DEFAULT_TIMEOUT),
.option_layer(config.deduplicate_query.unwrap_or_default().then(
QueryDeduplicationLayer::default
))
.option_layer(rate_limit)
.layer(TimeoutLayer::new(
config
.timeout
.unwrap_or(DEFAULT_TIMEOUT),
))
.option_layer(retry)
.option_layer(rate_limit)
.service(service)
.map_request(move |mut req: SubgraphRequest| {
if let Some(compression) = config.compression {
Expand Down
54 changes: 54 additions & 0 deletions apollo-router/src/plugins/traffic_shaping/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::future;
use std::sync::Arc;
use std::time::Duration;

use tower::retry::budget::Budget;
use tower::retry::Policy;

#[derive(Clone, Default)]
pub(crate) struct RetryPolicy {
budget: Arc<Budget>,
}

impl RetryPolicy {
pub(crate) fn new(
duration: Option<Duration>,
min_per_sec: Option<u32>,
retry_percent: Option<f32>,
) -> Self {
Self {
budget: Arc::new(Budget::new(
duration.unwrap_or_else(|| Duration::from_secs(10)),
min_per_sec.unwrap_or(10),
retry_percent.unwrap_or(0.2),
)),
}
}
}

impl<Req: Clone, Res, E> Policy<Req, Res, E> for RetryPolicy {
type Future = future::Ready<Self>;

fn retry(&self, _req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so deposit budget and don't retry...
self.budget.deposit();
None
}
Err(_e) => {
let withdrew = self.budget.withdraw();
if withdrew.is_err() {
return None;
}

Some(future::ready(self.clone()))
}
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(req.clone())
}
}
29 changes: 29 additions & 0 deletions apollo-router/src/services/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,35 @@ impl Request {
}
}

impl Clone for Request {
fn clone(&self) -> Self {
// http::Request is not clonable so we have to rebuild a new one
// we don't use the extensions field for now
let mut builder = http::Request::builder()
.method(self.subgraph_request.method())
.version(self.subgraph_request.version())
.uri(self.subgraph_request.uri());

{
let headers = builder.headers_mut().unwrap();
headers.extend(
self.subgraph_request
.headers()
.iter()
.map(|(name, value)| (name.clone(), value.clone())),
);
}
let subgraph_request = builder.body(self.subgraph_request.body().clone()).unwrap();

Self {
supergraph_request: self.supergraph_request.clone(),
subgraph_request,
operation_kind: self.operation_kind,
context: self.context.clone(),
}
}
}

assert_impl_all!(Response: Send);
#[derive(Debug)]
#[non_exhaustive]
Expand Down
6 changes: 6 additions & 0 deletions docs/source/configuration/traffic-shaping.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The Apollo Router supports the following types of traffic shaping between itself
- The router currently supports `gzip`, `br`, and `deflate`.
- **Global rate limiting** - If you want to rate limit requests to subgraphs or to the router itself.
- **Timeout**: - Set a timeout to subgraphs and router requests.
- **Request Retry**: - retry subgraph requests if they fail due to network errors. This implements Finagle's retry buckets mechanism. **Experimental feature**: retry configuration might change in the future.

Each of these optimizations can reduce network bandwidth and CPU usage for your subgraphs.

Expand All @@ -38,6 +39,11 @@ traffic_shaping:
capacity: 10
interval: 5s # Must not be greater than 18_446_744_073_709_551_615 milliseconds and not less than 0 milliseconds
timeout: 50s # If a request to the subgraph 'products' takes more than 50secs then cancel the request (30 sec by default)
experimental_retry:
min_per_sec: 10 # minimal number of retries per second (`min_per_sec`, default is 10 retries per second)
ttl: 10s # for each successful request, we register a token, that expires according to this option (default: 10s)
retry_percent: 0.2 # defines the proportion of available retries to the current number of tokens

```

Any configuration under the `subgraphs` key takes precedence over configuration under the `all` key. In the example above, query deduplication is enabled for all subgraphs _except_ the `products` subgraph.