Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changesets/fix_caroline_redis_conn_leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### Fix Redis connection leak ([PR #7319](https://github.com/apollographql/router/pull/7319))

The router performs a 'hot reload' whenever it detects a schema update. During this reload, it effectively instantiates a new internal router, warms it up (optional), redirects all traffic to this new router, and drops the old internal router.

This change fixes a bug in that drop process where the Redis connections are never told to terminate, even though the Redis client pool is dropped. This leads to an ever-increasing number of inactive Redis connections, which eats up memory.

It also adds a new up-down counter metric, `apollo.router.cache.redis.connections`, to track the number of open Redis connections. This metric includes a `kind` label to discriminate between different Redis connection pools, which mirrors the `kind` label on other cache metrics (ie `apollo.router.cache.hit.time`).

By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7319
56 changes: 53 additions & 3 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -51,9 +52,39 @@ pub(crate) struct RedisValue<V>(pub(crate) V)
where
V: ValueType;

/// `DropSafeRedisPool` is a wrapper for `fred::prelude::RedisPool` which closes the pool's Redis
/// connections when it is dropped.
//
// Dev notes:
// * the inner `RedisPool` must be wrapped in an `Arc` because closing the connections happens
// in a spawned async task.
// * why not just implement this within `Drop` for `RedisCacheStorage`? Because `RedisCacheStorage`
// is cloned frequently throughout the router, and we don't want to close the connections
// when each clone is dropped, only when the last instance is dropped.
struct DropSafeRedisPool(Arc<RedisPool>);
impl Deref for DropSafeRedisPool {
type Target = RedisPool;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Drop for DropSafeRedisPool {
fn drop(&mut self) {
let inner = self.0.clone();
tokio::spawn(async move {
let result = inner.quit().await;
if let Err(err) = result {
tracing::warn!("Caught error while closing unused Redis connections: {err:?}");
}
});
}
}

#[derive(Clone)]
pub(crate) struct RedisCacheStorage {
inner: Arc<RedisPool>,
inner: Arc<DropSafeRedisPool>,
namespace: Option<Arc<String>>,
pub(crate) ttl: Option<Duration>,
is_cluster: bool,
Expand Down Expand Up @@ -144,7 +175,7 @@ where
}

impl RedisCacheStorage {
pub(crate) async fn new(config: RedisCache) -> Result<Self, BoxError> {
pub(crate) async fn new(config: RedisCache, caller: &'static str) -> Result<Self, BoxError> {
let url = Self::preprocess_urls(config.urls)?;
let mut client_config = RedisConfig::from_url(url.as_str())?;
let is_cluster = url.scheme() == "redis-cluster" || url.scheme() == "rediss-cluster";
Expand Down Expand Up @@ -180,6 +211,7 @@ impl RedisCacheStorage {
config.ttl,
config.reset_ttl,
is_cluster,
caller,
)
.await
}
Expand All @@ -199,10 +231,12 @@ impl RedisCacheStorage {
None,
false,
false,
"test",
)
.await
}

#[allow(clippy::too_many_arguments)]
async fn create_client(
client_config: RedisConfig,
timeout: Duration,
Expand All @@ -211,6 +245,7 @@ impl RedisCacheStorage {
ttl: Option<Duration>,
reset_ttl: bool,
is_cluster: bool,
caller: &'static str,
) -> Result<Self, BoxError> {
let pooled_client = RedisPool::new(
client_config,
Expand All @@ -229,6 +264,14 @@ impl RedisCacheStorage {
let mut error_rx = client.error_rx();
let mut reconnect_rx = client.reconnect_rx();

i64_up_down_counter_with_unit!(
"apollo.router.cache.redis.connections",
"Number of Redis connections",
"{connection}",
1,
kind = caller
);

tokio::spawn(async move {
while let Ok(error) = error_rx.recv().await {
tracing::error!("Client disconnected with error: {:?}", error);
Expand All @@ -238,6 +281,13 @@ impl RedisCacheStorage {
while reconnect_rx.recv().await.is_ok() {
tracing::info!("Redis client reconnected.");
}
i64_up_down_counter_with_unit!(
"apollo.router.cache.redis.connections",
"Number of Redis connections",
"{connection}",
-1,
kind = caller
);
});
}

Expand All @@ -250,7 +300,7 @@ impl RedisCacheStorage {

tracing::trace!("redis connection established");
Ok(Self {
inner: Arc::new(pooled_client),
inner: Arc::new(DropSafeRedisPool(Arc::new(pooled_client))),
namespace: namespace.map(Arc::new),
ttl,
is_cluster,
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/cache/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where
inner: Arc::new(Mutex::new(LruCache::new(max_capacity))),
redis: if let Some(config) = config {
let required_to_start = config.required_to_start;
match RedisCacheStorage::new(config).await {
match RedisCacheStorage::new(config, caller).await {
Err(e) => {
tracing::error!(
cache = caller,
Expand Down
2 changes: 2 additions & 0 deletions apollo-router/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,8 @@ pub(crate) type MetricFuture<T> = Pin<Box<dyn Future<Output = <T as Future>::Out

#[cfg(test)]
pub(crate) trait FutureMetricsExt<T> {
/// See [dev-docs/metrics.md](https://github.com/apollographql/router/blob/dev/dev-docs/metrics.md#testing-async)
/// for details on this function.
fn with_metrics(
self,
) -> tokio::task::futures::TaskLocalFuture<
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl Plugin for EntityCache {
let required_to_start = redis_config.required_to_start;
// we need to explicitly disable TTL reset because it is managed directly by this plugin
redis_config.reset_ttl = false;
all = match RedisCacheStorage::new(redis_config).await {
all = match RedisCacheStorage::new(redis_config, "entity").await {
Ok(storage) => Some(storage),
Err(e) => {
tracing::error!(
Expand All @@ -234,7 +234,7 @@ impl Plugin for EntityCache {
// we need to explicitly disable TTL reset because it is managed directly by this plugin
let mut redis_config = redis.clone();
redis_config.reset_ttl = false;
let storage = match RedisCacheStorage::new(redis_config).await {
let storage = match RedisCacheStorage::new(redis_config, "entity").await {
Ok(storage) => Some(storage),
Err(e) => {
tracing::error!(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
supergraph:
query_planning:
cache:
redis:
required_to_start: true
urls:
- redis://localhost:6379
ttl: 1s
pool_size: 4
telemetry:
exporters:
metrics:
prometheus:
listen: 127.0.0.1:4000
enabled: true
path: /metrics
26 changes: 26 additions & 0 deletions apollo-router/tests/integration/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,3 +1115,29 @@ async fn test_redis_query_plan_config_update(updated_config: &str, new_cache_key
.assert_redis_cache_contains(new_cache_key, Some(starting_key))
.await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_redis_connections_are_closed_on_router_reload() {
if !graph_os_enabled() {
return;
}

let router_config = include_str!("fixtures/redis_connection_closure.router.yaml");
let mut router = IntegrationTest::builder()
.config(router_config)
.build()
.await;

router.start().await;
router.assert_started().await;

let expected_metric = r#"apollo_router_cache_redis_connections{kind="query planner",otel_scope_name="apollo/router"} 4"#;
router.assert_metrics_contains(expected_metric, None).await;

// check that reloading the schema yields the same number of redis connections
let new_router_config = format!("{router_config}\ninclude_subgraph_errors:\n all: true");
router.update_config(&new_router_config).await;
router.assert_reloaded().await;

router.assert_metrics_contains(expected_metric, None).await;
}
26 changes: 26 additions & 0 deletions dev-docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,32 @@ Make sure to use `.with_metrics()` method on the async block to ensure that the
}
```

Note: this relies on metrics being updated within the same thread. Metrics that are updated from multiple threads will
not be collected correctly.

```rust
#[tokio::test]
async fn test_spawned_metric_resolution() {
async {
u64_counter!("apollo.router.test", "metric", 1);
assert_counter!("apollo.router.test", 1);

tokio::spawn(async move {
u64_counter!("apollo.router.test", "metric", 2);
})
.await
.unwrap();

// In real operations, this metric resolves to a total of 3!
// However, in testing, it will resolve to 1, because the second incrementation happens in another thread.
// assert_counter!("apollo.router.test", 3);
assert_counter!("apollo.router.test", 1);
}
.with_metrics()
.await;
}
```

## Callsite instrument caching

When using the new metrics macros a reference to an instrument is cached to ensure that the meter provider does not have to be queried over and over.
Expand Down