Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ to validate the data sent back to the client. Those query shapes were invalid fo

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2102

### *Experimental* 🥼 APQ and query planner Redis caching fixes ([PR #2176](https://github.com/apollographql/router/pull/2176))

* use a null byte as separator in Redis keys
* handle Redis connection errors
* mark APQ and query plan caching as license key functionality

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/2176

## 🛠 Maintenance

### Verify that deferred fragment acts as a boundary for nullability rules ([Issue #2169](https://github.com/apollographql/router/issues/2169))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ impl HttpServerFactory for AxumHttpServerFactory {
let apq = APQLayer::with_cache(
DeduplicatingCache::from_configuration(
&configuration.supergraph.apq.experimental_cache,
"APQ",
)
.await,
);
Expand Down
25 changes: 18 additions & 7 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use self::storage::CacheStorage;
use self::storage::KeyType;
use self::storage::ValueType;

#[cfg(feature = "experimental_cache")]
mod redis;
pub(crate) mod storage;

type WaitMap<K, V> = Arc<Mutex<HashMap<K, broadcast::Sender<V>>>>;
Expand All @@ -26,24 +28,33 @@ where
K: KeyType + 'static,
V: ValueType + 'static,
{
#[cfg(test)]
pub(crate) async fn new() -> Self {
Self::with_capacity(DEFAULT_CACHE_CAPACITY, None).await
Self::with_capacity(DEFAULT_CACHE_CAPACITY, None, "test").await
}

pub(crate) async fn with_capacity(capacity: usize, redis_urls: Option<Vec<String>>) -> Self {
pub(crate) async fn with_capacity(
capacity: usize,
redis_urls: Option<Vec<String>>,
caller: &str,
) -> Self {
Self {
wait_map: Arc::new(Mutex::new(HashMap::new())),
storage: CacheStorage::new(capacity, redis_urls).await,
storage: CacheStorage::new(capacity, redis_urls, caller).await,
}
}

pub(crate) async fn from_configuration(config: &crate::configuration::Cache) -> Self {
pub(crate) async fn from_configuration(
config: &crate::configuration::Cache,
caller: &str,
) -> Self {
Self::with_capacity(
config.in_memory.limit,
#[cfg(feature = "experimental_cache")]
config.redis.as_ref().map(|c| c.urls.clone()),
#[cfg(not(feature = "experimental_cache"))]
None,
caller,
)
.await
}
Expand Down Expand Up @@ -198,7 +209,7 @@ mod tests {
#[tokio::test]
async fn example_cache_usage() {
let k = "key".to_string();
let cache = DeduplicatingCache::with_capacity(1, None).await;
let cache = DeduplicatingCache::with_capacity(1, None, "test").await;

let entry = cache.get(&k).await;

Expand All @@ -215,7 +226,7 @@ mod tests {
#[test(tokio::test)]
async fn it_should_enforce_cache_limits() {
let cache: DeduplicatingCache<usize, usize> =
DeduplicatingCache::with_capacity(13, None).await;
DeduplicatingCache::with_capacity(13, None, "test").await;

for i in 0..14 {
let entry = cache.get(&i).await;
Expand All @@ -238,7 +249,7 @@ mod tests {
mock.expect_retrieve().times(1).return_const(1usize);

let cache: DeduplicatingCache<usize, usize> =
DeduplicatingCache::with_capacity(10, None).await;
DeduplicatingCache::with_capacity(10, None, "test").await;

// Let's trigger 100 concurrent gets of the same value and ensure only
// one delegated retrieve is made
Expand Down
144 changes: 144 additions & 0 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// This entire file is license key functionality

use std::fmt;
use std::sync::Arc;

use redis::AsyncCommands;
use redis::FromRedisValue;
use redis::RedisResult;
use redis::RedisWrite;
use redis::ToRedisArgs;
use redis_cluster_async::Client;
use redis_cluster_async::Connection;
use tokio::sync::Mutex;

use super::KeyType;
use super::ValueType;

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub(crate) struct RedisKey<K>(pub(crate) K)
where
K: KeyType;

#[derive(Clone, Debug)]
pub(crate) struct RedisValue<V>(pub(crate) V)
where
V: ValueType;

#[derive(Clone)]
pub(crate) struct RedisCacheStorage {
inner: Arc<Mutex<Connection>>,
}

fn get_type_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}

impl<K> fmt::Display for RedisKey<K>
where
K: KeyType,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl<K> ToRedisArgs for RedisKey<K>
where
K: KeyType,
{
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
out.write_arg_fmt(self);
}
}

impl<V> fmt::Display for RedisValue<V>
where
V: ValueType,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}|{:?}", get_type_of(&self.0), self.0)
}
}

impl<V> ToRedisArgs for RedisValue<V>
where
V: ValueType,
{
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let v = serde_json::to_vec(&self.0)
.expect("JSON serialization should not fail for redis values");
out.write_arg(&v);
}
}

impl<V> FromRedisValue for RedisValue<V>
where
V: ValueType,
{
fn from_redis_value(v: &redis::Value) -> RedisResult<Self> {
match v {
redis::Value::Bulk(bulk_data) => {
for entry in bulk_data {
tracing::trace!("entry: {:?}", entry);
}
Err(redis::RedisError::from((
redis::ErrorKind::TypeError,
"the data is the wrong type",
)))
}
redis::Value::Data(v) => serde_json::from_slice(v).map(RedisValue).map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"can't deserialize from JSON",
e.to_string(),
))
}),
res => Err(redis::RedisError::from((
redis::ErrorKind::TypeError,
"the data is the wrong type",
format!("{:?}", res),
))),
}
}
}

impl RedisCacheStorage {
pub(crate) async fn new(urls: Vec<String>) -> Result<Self, redis::RedisError> {
let client = Client::open(urls)?;
let connection = client.get_connection().await?;

tracing::trace!("redis connection established");
Ok(Self {
inner: Arc::new(Mutex::new(connection)),
})
}

pub(crate) async fn get<K: KeyType, V: ValueType>(
&self,
key: RedisKey<K>,
) -> Option<RedisValue<V>> {
tracing::trace!("getting from redis: {:?}", key);
let mut guard = self.inner.lock().await;
guard.get(key).await.ok()
}

pub(crate) async fn insert<K: KeyType, V: ValueType>(
&self,
key: RedisKey<K>,
value: RedisValue<V>,
) {
tracing::trace!("inserting into redis: {:?}, {:?}", key, value);
let mut guard = self.inner.lock().await;
let r = guard
.set::<RedisKey<K>, RedisValue<V>, redis::Value>(key, value)
.await;
tracing::trace!("insert result {:?}", r);
}
}
Loading