diff --git a/.changesets/fix_caroline_redis_conn_leak.md b/.changesets/fix_caroline_redis_conn_leak.md new file mode 100644 index 0000000000..4247ee0e3d --- /dev/null +++ b/.changesets/fix_caroline_redis_conn_leak.md @@ -0,0 +1,9 @@ +### Fix Redis connection leak ([PR #7319](https://github.com/apollographql/router/pull/7319)) + +The router performs a 'hot reload' whenever it detects a schema update. During this reload, it effectively instantiates a new internal router, warms it up (optional), redirects all traffic to this new router, and drops the old internal router. + +This change fixes a bug in that drop process where the Redis connections are never told to terminate, even though the Redis client pool is dropped. This leads to an ever-increasing number of inactive Redis connections, which eats up memory. + +It also adds a new up-down counter metric, `apollo.router.cache.redis.connections`, to track the number of open Redis connections. This metric includes a `kind` label to discriminate between different Redis connection pools, which mirrors the `kind` label on other cache metrics (ie `apollo.router.cache.hit.time`). + +By [@carodewig](https://github.com/carodewig) in https://github.com/apollographql/router/pull/7319 diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs index b6d5be6496..05c8db29f1 100644 --- a/apollo-router/src/cache/redis.rs +++ b/apollo-router/src/cache/redis.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::fmt; +use std::ops::Deref; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -51,9 +52,39 @@ pub(crate) struct RedisValue(pub(crate) V) where V: ValueType; +/// `DropSafeRedisPool` is a wrapper for `fred::prelude::RedisPool` which closes the pool's Redis +/// connections when it is dropped. +// +// Dev notes: +// * the inner `RedisPool` must be wrapped in an `Arc` because closing the connections happens +// in a spawned async task. +// * why not just implement this within `Drop` for `RedisCacheStorage`? Because `RedisCacheStorage` +// is cloned frequently throughout the router, and we don't want to close the connections +// when each clone is dropped, only when the last instance is dropped. +struct DropSafeRedisPool(Arc); +impl Deref for DropSafeRedisPool { + type Target = RedisPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for DropSafeRedisPool { + fn drop(&mut self) { + let inner = self.0.clone(); + tokio::spawn(async move { + let result = inner.quit().await; + if let Err(err) = result { + tracing::warn!("Caught error while closing unused Redis connections: {err:?}"); + } + }); + } +} + #[derive(Clone)] pub(crate) struct RedisCacheStorage { - inner: Arc, + inner: Arc, namespace: Option>, pub(crate) ttl: Option, is_cluster: bool, @@ -144,7 +175,7 @@ where } impl RedisCacheStorage { - pub(crate) async fn new(config: RedisCache) -> Result { + pub(crate) async fn new(config: RedisCache, caller: &'static str) -> Result { let url = Self::preprocess_urls(config.urls)?; let mut client_config = RedisConfig::from_url(url.as_str())?; let is_cluster = url.scheme() == "redis-cluster" || url.scheme() == "rediss-cluster"; @@ -180,6 +211,7 @@ impl RedisCacheStorage { config.ttl, config.reset_ttl, is_cluster, + caller, ) .await } @@ -199,10 +231,12 @@ impl RedisCacheStorage { None, false, false, + "test", ) .await } + #[allow(clippy::too_many_arguments)] async fn create_client( client_config: RedisConfig, timeout: Duration, @@ -211,6 +245,7 @@ impl RedisCacheStorage { ttl: Option, reset_ttl: bool, is_cluster: bool, + caller: &'static str, ) -> Result { let pooled_client = RedisPool::new( client_config, @@ -229,6 +264,14 @@ impl RedisCacheStorage { let mut error_rx = client.error_rx(); let mut reconnect_rx = client.reconnect_rx(); + i64_up_down_counter_with_unit!( + "apollo.router.cache.redis.connections", + "Number of Redis connections", + "{connection}", + 1, + kind = caller + ); + tokio::spawn(async move { while let Ok(error) = error_rx.recv().await { tracing::error!("Client disconnected with error: {:?}", error); @@ -238,6 +281,13 @@ impl RedisCacheStorage { while reconnect_rx.recv().await.is_ok() { tracing::info!("Redis client reconnected."); } + i64_up_down_counter_with_unit!( + "apollo.router.cache.redis.connections", + "Number of Redis connections", + "{connection}", + -1, + kind = caller + ); }); } @@ -250,7 +300,7 @@ impl RedisCacheStorage { tracing::trace!("redis connection established"); Ok(Self { - inner: Arc::new(pooled_client), + inner: Arc::new(DropSafeRedisPool(Arc::new(pooled_client))), namespace: namespace.map(Arc::new), ttl, is_cluster, diff --git a/apollo-router/src/cache/storage.rs b/apollo-router/src/cache/storage.rs index c75b662674..288622dc07 100644 --- a/apollo-router/src/cache/storage.rs +++ b/apollo-router/src/cache/storage.rs @@ -80,7 +80,7 @@ where inner: Arc::new(Mutex::new(LruCache::new(max_capacity))), redis: if let Some(config) = config { let required_to_start = config.required_to_start; - match RedisCacheStorage::new(config).await { + match RedisCacheStorage::new(config, caller).await { Err(e) => { tracing::error!( cache = caller, diff --git a/apollo-router/src/metrics/mod.rs b/apollo-router/src/metrics/mod.rs index b84549ee89..749fdc33f3 100644 --- a/apollo-router/src/metrics/mod.rs +++ b/apollo-router/src/metrics/mod.rs @@ -1524,6 +1524,8 @@ pub(crate) type MetricFuture = Pin::Out #[cfg(test)] pub(crate) trait FutureMetricsExt { + /// See [dev-docs/metrics.md](https://github.com/apollographql/router/blob/dev/dev-docs/metrics.md#testing-async) + /// for details on this function. fn with_metrics( self, ) -> tokio::task::futures::TaskLocalFuture< diff --git a/apollo-router/src/plugins/cache/entity.rs b/apollo-router/src/plugins/cache/entity.rs index a4b378727e..e29979db88 100644 --- a/apollo-router/src/plugins/cache/entity.rs +++ b/apollo-router/src/plugins/cache/entity.rs @@ -212,7 +212,7 @@ impl Plugin for EntityCache { let required_to_start = redis_config.required_to_start; // we need to explicitly disable TTL reset because it is managed directly by this plugin redis_config.reset_ttl = false; - all = match RedisCacheStorage::new(redis_config).await { + all = match RedisCacheStorage::new(redis_config, "entity").await { Ok(storage) => Some(storage), Err(e) => { tracing::error!( @@ -234,7 +234,7 @@ impl Plugin for EntityCache { // we need to explicitly disable TTL reset because it is managed directly by this plugin let mut redis_config = redis.clone(); redis_config.reset_ttl = false; - let storage = match RedisCacheStorage::new(redis_config).await { + let storage = match RedisCacheStorage::new(redis_config, "entity").await { Ok(storage) => Some(storage), Err(e) => { tracing::error!( diff --git a/apollo-router/tests/integration/fixtures/redis_connection_closure.router.yaml b/apollo-router/tests/integration/fixtures/redis_connection_closure.router.yaml new file mode 100644 index 0000000000..6e7449ae65 --- /dev/null +++ b/apollo-router/tests/integration/fixtures/redis_connection_closure.router.yaml @@ -0,0 +1,16 @@ +supergraph: + query_planning: + cache: + redis: + required_to_start: true + urls: + - redis://localhost:6379 + ttl: 1s + pool_size: 4 +telemetry: + exporters: + metrics: + prometheus: + listen: 127.0.0.1:4000 + enabled: true + path: /metrics diff --git a/apollo-router/tests/integration/redis.rs b/apollo-router/tests/integration/redis.rs index f45b0599c7..a5c3966eff 100644 --- a/apollo-router/tests/integration/redis.rs +++ b/apollo-router/tests/integration/redis.rs @@ -1115,3 +1115,29 @@ async fn test_redis_query_plan_config_update(updated_config: &str, new_cache_key .assert_redis_cache_contains(new_cache_key, Some(starting_key)) .await; } + +#[tokio::test(flavor = "multi_thread")] +async fn test_redis_connections_are_closed_on_router_reload() { + if !graph_os_enabled() { + return; + } + + let router_config = include_str!("fixtures/redis_connection_closure.router.yaml"); + let mut router = IntegrationTest::builder() + .config(router_config) + .build() + .await; + + router.start().await; + router.assert_started().await; + + let expected_metric = r#"apollo_router_cache_redis_connections{kind="query planner",otel_scope_name="apollo/router"} 4"#; + router.assert_metrics_contains(expected_metric, None).await; + + // check that reloading the schema yields the same number of redis connections + let new_router_config = format!("{router_config}\ninclude_subgraph_errors:\n all: true"); + router.update_config(&new_router_config).await; + router.assert_reloaded().await; + + router.assert_metrics_contains(expected_metric, None).await; +} diff --git a/dev-docs/metrics.md b/dev-docs/metrics.md index f41fd72994..e0944a979d 100644 --- a/dev-docs/metrics.md +++ b/dev-docs/metrics.md @@ -183,6 +183,32 @@ Make sure to use `.with_metrics()` method on the async block to ensure that the } ``` +Note: this relies on metrics being updated within the same thread. Metrics that are updated from multiple threads will +not be collected correctly. + +```rust +#[tokio::test] +async fn test_spawned_metric_resolution() { + async { + u64_counter!("apollo.router.test", "metric", 1); + assert_counter!("apollo.router.test", 1); + + tokio::spawn(async move { + u64_counter!("apollo.router.test", "metric", 2); + }) + .await + .unwrap(); + + // In real operations, this metric resolves to a total of 3! + // However, in testing, it will resolve to 1, because the second incrementation happens in another thread. + // assert_counter!("apollo.router.test", 3); + assert_counter!("apollo.router.test", 1); + } + .with_metrics() + .await; +} +``` + ## Callsite instrument caching When using the new metrics macros a reference to an instrument is cached to ensure that the meter provider does not have to be queried over and over.