diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 19abfa8f58..9dc3229e79 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -283,6 +283,14 @@ to validate the data sent back to the client. Those query shapes were invalid fo By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2102 +### *Experimental* 🥼 APQ and query planner Redis caching fixes ([PR #2176](https://github.com/apollographql/router/pull/2176)) + +* use a null byte as separator in Redis keys +* handle Redis connection errors +* mark APQ and query plan caching as license key functionality + +By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2176 + ## 🛠 Maintenance ### Verify that deferred fragment acts as a boundary for nullability rules ([Issue #2169](https://github.com/apollographql/router/issues/2169)) diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 5fcf3a3758..53107202f8 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -166,6 +166,7 @@ impl HttpServerFactory for AxumHttpServerFactory { let apq = APQLayer::with_cache( DeduplicatingCache::from_configuration( &configuration.supergraph.apq.experimental_cache, + "APQ", ) .await, ); diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index e98a664ed6..5ade794dc3 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -9,6 +9,8 @@ use self::storage::CacheStorage; use self::storage::KeyType; use self::storage::ValueType; +#[cfg(feature = "experimental_cache")] +mod redis; pub(crate) mod storage; type WaitMap = Arc>>>; @@ -26,24 +28,33 @@ where K: KeyType + 'static, V: ValueType + 'static, { + #[cfg(test)] pub(crate) async fn new() -> Self { - Self::with_capacity(DEFAULT_CACHE_CAPACITY, None).await + Self::with_capacity(DEFAULT_CACHE_CAPACITY, None, "test").await } - pub(crate) async fn with_capacity(capacity: usize, redis_urls: Option>) -> Self { + pub(crate) async fn with_capacity( + capacity: usize, + redis_urls: Option>, + caller: &str, + ) -> Self { Self { wait_map: Arc::new(Mutex::new(HashMap::new())), - storage: CacheStorage::new(capacity, redis_urls).await, + storage: CacheStorage::new(capacity, redis_urls, caller).await, } } - pub(crate) async fn from_configuration(config: &crate::configuration::Cache) -> Self { + pub(crate) async fn from_configuration( + config: &crate::configuration::Cache, + caller: &str, + ) -> Self { Self::with_capacity( config.in_memory.limit, #[cfg(feature = "experimental_cache")] config.redis.as_ref().map(|c| c.urls.clone()), #[cfg(not(feature = "experimental_cache"))] None, + caller, ) .await } @@ -198,7 +209,7 @@ mod tests { #[tokio::test] async fn example_cache_usage() { let k = "key".to_string(); - let cache = DeduplicatingCache::with_capacity(1, None).await; + let cache = DeduplicatingCache::with_capacity(1, None, "test").await; let entry = cache.get(&k).await; @@ -215,7 +226,7 @@ mod tests { #[test(tokio::test)] async fn it_should_enforce_cache_limits() { let cache: DeduplicatingCache = - DeduplicatingCache::with_capacity(13, None).await; + DeduplicatingCache::with_capacity(13, None, "test").await; for i in 0..14 { let entry = cache.get(&i).await; @@ -238,7 +249,7 @@ mod tests { mock.expect_retrieve().times(1).return_const(1usize); let cache: DeduplicatingCache = - DeduplicatingCache::with_capacity(10, None).await; + DeduplicatingCache::with_capacity(10, None, "test").await; // Let's trigger 100 concurrent gets of the same value and ensure only // one delegated retrieve is made diff --git a/apollo-router/src/cache/redis.rs b/apollo-router/src/cache/redis.rs new file mode 100644 index 0000000000..e5294ff008 --- /dev/null +++ b/apollo-router/src/cache/redis.rs @@ -0,0 +1,144 @@ +// This entire file is license key functionality + +use std::fmt; +use std::sync::Arc; + +use redis::AsyncCommands; +use redis::FromRedisValue; +use redis::RedisResult; +use redis::RedisWrite; +use redis::ToRedisArgs; +use redis_cluster_async::Client; +use redis_cluster_async::Connection; +use tokio::sync::Mutex; + +use super::KeyType; +use super::ValueType; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub(crate) struct RedisKey(pub(crate) K) +where + K: KeyType; + +#[derive(Clone, Debug)] +pub(crate) struct RedisValue(pub(crate) V) +where + V: ValueType; + +#[derive(Clone)] +pub(crate) struct RedisCacheStorage { + inner: Arc>, +} + +fn get_type_of(_: &T) -> &'static str { + std::any::type_name::() +} + +impl fmt::Display for RedisKey +where + K: KeyType, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl ToRedisArgs for RedisKey +where + K: KeyType, +{ + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + out.write_arg_fmt(self); + } +} + +impl fmt::Display for RedisValue +where + V: ValueType, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}|{:?}", get_type_of(&self.0), self.0) + } +} + +impl ToRedisArgs for RedisValue +where + V: ValueType, +{ + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + let v = serde_json::to_vec(&self.0) + .expect("JSON serialization should not fail for redis values"); + out.write_arg(&v); + } +} + +impl FromRedisValue for RedisValue +where + V: ValueType, +{ + fn from_redis_value(v: &redis::Value) -> RedisResult { + match v { + redis::Value::Bulk(bulk_data) => { + for entry in bulk_data { + tracing::trace!("entry: {:?}", entry); + } + Err(redis::RedisError::from(( + redis::ErrorKind::TypeError, + "the data is the wrong type", + ))) + } + redis::Value::Data(v) => serde_json::from_slice(v).map(RedisValue).map_err(|e| { + redis::RedisError::from(( + redis::ErrorKind::TypeError, + "can't deserialize from JSON", + e.to_string(), + )) + }), + res => Err(redis::RedisError::from(( + redis::ErrorKind::TypeError, + "the data is the wrong type", + format!("{:?}", res), + ))), + } + } +} + +impl RedisCacheStorage { + pub(crate) async fn new(urls: Vec) -> Result { + let client = Client::open(urls)?; + let connection = client.get_connection().await?; + + tracing::trace!("redis connection established"); + Ok(Self { + inner: Arc::new(Mutex::new(connection)), + }) + } + + pub(crate) async fn get( + &self, + key: RedisKey, + ) -> Option> { + tracing::trace!("getting from redis: {:?}", key); + let mut guard = self.inner.lock().await; + guard.get(key).await.ok() + } + + pub(crate) async fn insert( + &self, + key: RedisKey, + value: RedisValue, + ) { + tracing::trace!("inserting into redis: {:?}, {:?}", key, value); + let mut guard = self.inner.lock().await; + let r = guard + .set::, RedisValue, redis::Value>(key, value) + .await; + tracing::trace!("insert result {:?}", r); + } +} diff --git a/apollo-router/src/cache/storage.rs b/apollo-router/src/cache/storage.rs index 320ccbd6c8..b719c1988b 100644 --- a/apollo-router/src/cache/storage.rs +++ b/apollo-router/src/cache/storage.rs @@ -1,3 +1,5 @@ +// This entire file is license key functionality + use std::fmt; use std::hash::Hash; use std::sync::Arc; @@ -7,6 +9,9 @@ use serde::de::DeserializeOwned; use serde::Serialize; use tokio::sync::Mutex; +#[cfg(feature = "experimental_cache")] +use super::redis::*; + pub(crate) trait KeyType: Clone + fmt::Debug + fmt::Display + Hash + Eq + Send + Sync { @@ -34,9 +39,6 @@ where // It has the functions it needs already } -#[cfg(feature = "experimental_cache")] -use redis_storage::*; - // placeholder storage module // // this will be replaced by the multi level (in memory + redis/memcached) once we find @@ -53,12 +55,26 @@ where K: KeyType, V: ValueType, { - pub(crate) async fn new(max_capacity: usize, _redis_urls: Option>) -> Self { + pub(crate) async fn new( + max_capacity: usize, + _redis_urls: Option>, + _caller: &str, + ) -> Self { Self { inner: Arc::new(Mutex::new(LruCache::new(max_capacity))), #[cfg(feature = "experimental_cache")] redis: if let Some(urls) = _redis_urls { - Some(RedisCacheStorage::new(urls).await) + match RedisCacheStorage::new(urls).await { + Err(e) => { + tracing::error!( + "could not open connection to Redis for {} caching: {:?}", + _caller, + e + ); + None + } + Ok(storage) => Some(storage), + } } else { None }, @@ -103,149 +119,3 @@ where self.inner.lock().await.len() } } - -#[cfg(feature = "experimental_cache")] -mod redis_storage { - use std::fmt; - use std::sync::Arc; - - use redis::AsyncCommands; - use redis::FromRedisValue; - use redis::RedisResult; - use redis::RedisWrite; - use redis::ToRedisArgs; - use redis_cluster_async::Client; - use redis_cluster_async::Connection; - use tokio::sync::Mutex; - - use super::KeyType; - use super::ValueType; - - #[derive(Clone, Debug, Eq, Hash, PartialEq)] - pub(crate) struct RedisKey(pub(crate) K) - where - K: KeyType; - - #[derive(Clone, Debug)] - pub(crate) struct RedisValue(pub(crate) V) - where - V: ValueType; - - #[derive(Clone)] - pub(crate) struct RedisCacheStorage { - inner: Arc>, - } - - fn get_type_of(_: &T) -> &'static str { - std::any::type_name::() - } - - impl fmt::Display for RedisKey - where - K: KeyType, - { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } - } - - impl ToRedisArgs for RedisKey - where - K: KeyType, - { - fn write_redis_args(&self, out: &mut W) - where - W: ?Sized + RedisWrite, - { - out.write_arg_fmt(self); - } - } - - impl fmt::Display for RedisValue - where - V: ValueType, - { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}|{:?}", get_type_of(&self.0), self.0) - } - } - - impl ToRedisArgs for RedisValue - where - V: ValueType, - { - fn write_redis_args(&self, out: &mut W) - where - W: ?Sized + RedisWrite, - { - let v = serde_json::to_vec(&self.0).unwrap(); - out.write_arg(&v); - } - } - - impl FromRedisValue for RedisValue - where - V: ValueType, - { - fn from_redis_value(v: &redis::Value) -> RedisResult { - match v { - redis::Value::Bulk(bulk_data) => { - for entry in bulk_data { - tracing::trace!("entry: {:?}", entry); - // entry.parse::().unwrap() - } - Err(redis::RedisError::from(( - redis::ErrorKind::TypeError, - "the data is the wrong type", - ))) - } - redis::Value::Data(v) => serde_json::from_slice(v).map(RedisValue).map_err(|e| { - redis::RedisError::from(( - redis::ErrorKind::TypeError, - "can't deserialize from JSON", - e.to_string(), - )) - }), - res => Err(redis::RedisError::from(( - redis::ErrorKind::TypeError, - "the data is the wrong type", - format!("{:?}", res), - ))), - } - } - } - - impl RedisCacheStorage { - pub(crate) async fn new(urls: Vec) -> Self { - let client = Client::open(urls).expect("opening ClusterClient"); - let connection = client.get_connection().await.expect("got redis connection"); - - tracing::trace!("redis connection established"); - Self { - inner: Arc::new(Mutex::new(connection)), - } - } - - pub(crate) async fn get( - &self, - key: RedisKey, - ) -> Option> { - tracing::trace!("getting from redis: {:?}", key); - let mut guard = self.inner.lock().await; - guard.get(key).await.ok() - } - - pub(crate) async fn insert( - &self, - key: RedisKey, - value: RedisValue, - ) { - tracing::trace!("inserting into redis: {:?}, {:?}", key, value); - let mut guard = self.inner.lock().await; - let r = guard - .set::, RedisValue, redis::Value>(key, value) - .await; - tracing::trace!("insert result {:?}", r); - } - } -} diff --git a/apollo-router/src/configuration/mod.rs b/apollo-router/src/configuration/mod.rs index 721004b4ee..a4be643455 100644 --- a/apollo-router/src/configuration/mod.rs +++ b/apollo-router/src/configuration/mod.rs @@ -374,6 +374,7 @@ impl Configuration { }, ); } + Ok(self) } } diff --git a/apollo-router/src/introspection.rs b/apollo-router/src/introspection.rs index 1dbbcc3770..90b00a03d0 100644 --- a/apollo-router/src/introspection.rs +++ b/apollo-router/src/introspection.rs @@ -21,7 +21,7 @@ pub(crate) struct Introspection { impl Introspection { pub(crate) async fn with_capacity(configuration: &Configuration, capacity: usize) -> Self { Self { - cache: CacheStorage::new(capacity, None).await, + cache: CacheStorage::new(capacity, None, "introspection").await, defer_support: configuration.supergraph.preview_defer_support, } } diff --git a/apollo-router/src/query_planner/caching_query_planner.rs b/apollo-router/src/query_planner/caching_query_planner.rs index da2ed9c789..7f4d2861be 100644 --- a/apollo-router/src/query_planner/caching_query_planner.rs +++ b/apollo-router/src/query_planner/caching_query_planner.rs @@ -1,3 +1,5 @@ +// This entire file is license key functionality + use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; @@ -38,8 +40,10 @@ where schema_id: Option, config: &crate::configuration::QueryPlanning, ) -> CachingQueryPlanner { - let cache = - Arc::new(DeduplicatingCache::from_configuration(&config.experimental_cache).await); + let cache = Arc::new( + DeduplicatingCache::from_configuration(&config.experimental_cache, "query planner") + .await, + ); Self { cache, delegate, @@ -190,7 +194,7 @@ impl std::fmt::Display for CachingQueryKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "plan|{}|{}|{}", + "plan\0{}\0{}\0{}", self.schema_id.as_deref().unwrap_or("-"), self.query, self.operation.as_deref().unwrap_or("-") diff --git a/apollo-router/src/router.rs b/apollo-router/src/router.rs index 41352ca776..7e0d741ac0 100644 --- a/apollo-router/src/router.rs +++ b/apollo-router/src/router.rs @@ -36,6 +36,7 @@ use self::Event::UpdateSchema; use crate::axum_factory::make_axum_router; use crate::axum_factory::AxumHttpServerFactory; use crate::axum_factory::ListenAddrAndRouter; +use crate::cache::DeduplicatingCache; use crate::configuration::Configuration; use crate::configuration::ListenAddr; use crate::plugin::DynPlugin; @@ -64,7 +65,13 @@ async fn make_transport_service( .create(configuration.clone(), schema, None, Some(extra_plugins)) .await?; - let apq = APQLayer::new().await; + let apq = APQLayer::with_cache( + DeduplicatingCache::from_configuration( + &configuration.supergraph.apq.experimental_cache, + "APQ", + ) + .await, + ); let web_endpoints = service_factory.web_endpoints(); let routers = make_axum_router(service_factory, &configuration, web_endpoints, apq)?; // FIXME: how should diff --git a/apollo-router/src/services/layers/apq.rs b/apollo-router/src/services/layers/apq.rs index 6f8f309876..18d82f5166 100644 --- a/apollo-router/src/services/layers/apq.rs +++ b/apollo-router/src/services/layers/apq.rs @@ -3,6 +3,8 @@ //! For more information on APQ see: //! +// This entire file is license key functionality + use std::ops::ControlFlow; use futures::future::BoxFuture; @@ -38,12 +40,6 @@ pub(crate) struct APQLayer { } impl APQLayer { - pub(crate) async fn new() -> Self { - Self { - cache: DeduplicatingCache::new().await, - } - } - pub(crate) fn with_cache(cache: DeduplicatingCache) -> Self { Self { cache } } @@ -123,14 +119,14 @@ pub(crate) async fn apq_request( if query_matches_hash(query.as_str(), query_hash_bytes.as_slice()) { tracing::trace!("apq: cache insert"); let _ = request.context.insert("persisted_query_hit", false); - cache.insert(format!("apq|{query_hash}"), query).await; + cache.insert(redis_key(&query_hash), query).await; } else { tracing::warn!("apq: graphql request doesn't match provided sha256Hash"); } Ok(request) } (Some((apq_hash, _)), _) => { - if let Ok(cached_query) = cache.get(&format!("apq|{apq_hash}")).await.get().await { + if let Ok(cached_query) = cache.get(&redis_key(&apq_hash)).await.get().await { let _ = request.context.insert("persisted_query_hit", true); tracing::trace!("apq: cache hit"); request.supergraph_request.body_mut().query = Some(cached_query); @@ -171,6 +167,10 @@ fn query_matches_hash(query: &str, hash: &[u8]) -> bool { hash == digest.finalize().as_slice() } +fn redis_key(query_hash: &str) -> String { + format!("apq\0{query_hash}") +} + #[cfg(test)] mod apq_tests { use std::borrow::Cow; diff --git a/apollo-router/src/test_harness.rs b/apollo-router/src/test_harness.rs index 93de732c4b..f22450d91b 100644 --- a/apollo-router/src/test_harness.rs +++ b/apollo-router/src/test_harness.rs @@ -5,6 +5,7 @@ use tower::BoxError; use tower::Layer; use tower::ServiceExt; +use crate::cache::DeduplicatingCache; use crate::configuration::Configuration; use crate::plugin::test::canned; use crate::plugin::test::MockSubgraph; @@ -210,8 +211,14 @@ impl<'a> TestHarness<'a> { /// Builds the GraphQL service pub async fn build(self) -> Result { - let (_config, router_creator) = self.build_common().await?; - let apq = APQLayer::new().await; + let (configuration, router_creator) = self.build_common().await?; + let apq = APQLayer::with_cache( + DeduplicatingCache::from_configuration( + &configuration.supergraph.apq.experimental_cache, + "APQ", + ) + .await, + ); Ok(tower::service_fn(move |request| { // APQ must be added here because it is implemented in the HTTP server @@ -230,7 +237,13 @@ impl<'a> TestHarness<'a> { let (config, router_creator) = self.build_common().await?; let web_endpoints = router_creator.web_endpoints(); - let apq = APQLayer::new().await; + let apq = APQLayer::with_cache( + DeduplicatingCache::from_configuration( + &config.supergraph.apq.experimental_cache, + "APQ", + ) + .await, + ); let routers = make_axum_router(router_creator, &config, web_endpoints, apq)?; let ListenAddrAndRouter(_listener, router) = routers.main;