Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,19 +665,11 @@ impl RedisCacheStorage {
/// because `NotFound` shouldn't be considered an error.
pub(crate) async fn get_multiple_with_options<K: KeyType, V: ValueType>(
&self,
mut keys: Vec<RedisKey<K>>,
keys: Vec<RedisKey<K>>,
options: Options,
) -> Result<Vec<Result<RedisValue<V>, RedisError>>, RedisError> {
tracing::trace!("getting multiple values from redis: {:?}", keys);
let client = self.client();

if keys.len() == 1 {
let key = self.make_key(keys.swap_remove(0));
let res_value: Result<RedisValue<V>, RedisError> =
client.with_options(&options).get(key).await;
let value = res_value.inspect_err(|e| self.record_error(e))?;
Ok(vec![Ok(value)])
} else if self.is_cluster {
if self.is_cluster {
// we cannot do an MGET across hash slots (error: "ERR CROSSSLOT Keys in request don't
// hash to the same slot").
// we either need to group the keys by hash slot, or just send a GET for each key; given
Expand All @@ -687,7 +679,7 @@ impl RedisCacheStorage {

// then we query all the key groups at the same time
// use `client.replicas()` since we're in a cluster and can take advantage of read-replicas
let client = client.replicas().with_options(&options);
let client = self.client().replicas().with_options(&options);
let mut tasks = Vec::with_capacity(len);
for (index, key) in keys.into_iter().enumerate() {
let client = client.clone();
Expand All @@ -709,7 +701,8 @@ impl RedisCacheStorage {
.into_iter()
.map(|k| self.make_key(k))
.collect::<Vec<_>>();
let values: Vec<Option<RedisValue<V>>> = client
let values: Vec<Option<RedisValue<V>>> = self
.client()
.with_options(&options)
.mget(keys)
.await
Expand Down
26 changes: 15 additions & 11 deletions apollo-router/tests/integration/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1793,25 +1793,29 @@ async fn test_redis_doesnt_use_replicas_in_standalone_mode() {
assert!(redis_monitor_output.command_sent_to_any("GET"));
}

// NB: the `num_products` parameter determines how many `Product` entities will be requested from
// the `reviews` subgraph. This is useful to check that `get_multiple` works properly with different
// numbers of keys.
#[rstest::rstest]
#[tokio::test(flavor = "multi_thread")]
async fn test_redis_uses_replicas_in_clusters_for_mgets() {
async fn test_redis_uses_replicas_in_clusters_for_mgets(#[values(1, 3, 5)] num_products: usize) {
if !graph_os_enabled() {
return;
}

let router_config = include_str!("fixtures/response_cache_redis_cluster.router.yaml");
let mut subgraph_overrides = HashMap::new();

let products = [
json!({"__typename":"Product","upc":"1","name":"Table","reviews":[{"id":"review1"}]}),
json!({"__typename":"Product","upc":"2","name":"Chair","reviews":[{"id":"review2"}]}),
json!({"__typename":"Product","upc":"3","name":"Desk","reviews":[{"id":"review3"}]}),
json!({"__typename":"Product","upc":"4","name":"Lamp","reviews":[{"id":"review4"}]}),
json!({"__typename":"Product","upc":"5","name":"Sofa","reviews":[{"id":"review5"}]}),
];

let products_response = ResponseTemplate::new(200)
.set_body_json(serde_json::json! {{"data": {
"topProducts": [
{"__typename":"Product","upc":"1","name":"Table","reviews":[{"id":"review1"}]},
{"__typename":"Product","upc":"2","name":"Chair","reviews":[{"id":"review2"}]},
{"__typename":"Product","upc":"3","name":"Desk","reviews":[{"id":"review3"}]},
{"__typename":"Product","upc":"4","name":"Lamp","reviews":[{"id":"review4"}]},
{"__typename":"Product","upc":"5","name":"Sofa","reviews":[{"id":"review5"}]}
]
}}})
.set_body_json(serde_json::json! {{"data": {"topProducts": products[..num_products]}}})
.insert_header("cache-control", "max-age=500, public");

let reviews_response = ResponseTemplate::new(200)
Expand Down Expand Up @@ -1899,7 +1903,7 @@ async fn test_redis_uses_replicas_in_clusters_for_mgets() {
let parse_error = r#"apollo_router_cache_redis_errors_total{error_type="parse""#;
router.assert_metrics_does_not_contain(parse_error).await;

let example_cache_key = "version:1.1:subgraph:reviews:type:Product:representation:ddf7d062949ffde207db2ced05093a823d64730d30fac573d6168f13cc8080c5:hash:739583f793fb842194e6be6c6f126df63cc0ee86f8702745ac4630521ab6752d:data:070af9367f9025bd796a1b7e0cd1335246f658aa4857c3a4d6284673b7d07fa6";
let example_cache_key = "version:1.1:subgraph:reviews:type:Product:representation:dd668a3ba05fb2fcd9e372b7063fe717f3ff1c209c29fa8367b8324e49775115:hash:739583f793fb842194e6be6c6f126df63cc0ee86f8702745ac4630521ab6752d:data:070af9367f9025bd796a1b7e0cd1335246f658aa4857c3a4d6284673b7d07fa6";
router
.assert_redis_cache_contains(example_cache_key, None)
.await;
Expand Down