diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 9ff18f2bdc..0078045ac7 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -50,6 +50,12 @@ By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router ## 🐛 Fixes +### Fix the deduplication logic in deduplication caching [Issue #1984](https://github.com/apollographql/router/issues/1984)) + +Under load, it is possible to break the router deduplication logic and leave orphaned entries in the waiter map. This fixes the logic to prevent this from occurring. + +By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2014 + ### Follow directives from Uplink ([Issue #1494](https://github.com/apollographql/router/issues/1494) [Issue #1539](https://github.com/apollographql/router/issues/1539)) The Uplink API returns actionable info in its responses: diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 2cf7559b81..e76411cf37 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -55,6 +55,18 @@ where None => { let (sender, _receiver) = broadcast::channel(1); + let k = key.clone(); + // when _drop_signal is dropped, either by getting out of the block, returning + // the error from ready_oneshot or by cancellation, the drop_sentinel future will + // return with Err(), then we remove the entry from the wait map + let (_drop_signal, drop_sentinel) = oneshot::channel::<()>(); + let wait_map = self.wait_map.clone(); + tokio::task::spawn(async move { + let _ = drop_sentinel.await; + let mut locked_wait_map = wait_map.lock().await; + let _ = locked_wait_map.remove(&k); + }); + locked_wait_map.insert(key.clone(), sender.clone()); // we must not hold a lock over the wait map while we are waiting for a value from the @@ -63,27 +75,13 @@ where drop(locked_wait_map); if let Some(value) = self.storage.get(key).await { - let mut locked_wait_map = self.wait_map.lock().await; - let _ = locked_wait_map.remove(key); - let _ = sender.send(value.clone()); + self.send(sender, key, value.clone()).await; return Entry { inner: EntryInner::Value(value), }; } - let k = key.clone(); - // when _drop_signal is dropped, either by getting out of the block, returning - // the error from ready_oneshot or by cancellation, the drop_sentinel future will - // return with Err(), then we remove the entry from the wait map - let (_drop_signal, drop_sentinel) = oneshot::channel::<()>(); - let wait_map = self.wait_map.clone(); - tokio::task::spawn(async move { - let _ = drop_sentinel.await; - let mut locked_wait_map = wait_map.lock().await; - let _ = locked_wait_map.remove(&k); - }); - Entry { inner: EntryInner::First { sender, @@ -100,9 +98,12 @@ where self.storage.insert(key, value.clone()).await; } - pub(crate) async fn remove_wait(&self, key: &K) { + async fn send(&self, sender: broadcast::Sender, key: &K, value: V) { + // Lock the wait map to prevent more subscribers racing with our send + // notification let mut locked_wait_map = self.wait_map.lock().await; let _ = locked_wait_map.remove(key); + let _ = sender.send(value); } } @@ -157,8 +158,7 @@ where } = self.inner { cache.insert(key.clone(), value.clone()).await; - cache.remove_wait(&key).await; - let _ = sender.send(value); + cache.send(sender, &key, value).await; } } @@ -169,8 +169,7 @@ where sender, cache, key, .. } = self.inner { - let _ = sender.send(value); - cache.remove_wait(&key).await; + cache.send(sender, &key, value).await; } } }