diff --git a/apollo-router/src/cache/mod.rs b/apollo-router/src/cache/mod.rs index 2cf7559b81..171bb973c4 100644 --- a/apollo-router/src/cache/mod.rs +++ b/apollo-router/src/cache/mod.rs @@ -55,16 +55,7 @@ where None => { let (sender, _receiver) = broadcast::channel(1); - locked_wait_map.insert(key.clone(), sender.clone()); - - // we must not hold a lock over the wait map while we are waiting for a value from the - // cache. This way, other tasks can come and register interest in the same key, or - // request other keys independently - drop(locked_wait_map); - if let Some(value) = self.storage.get(key).await { - let mut locked_wait_map = self.wait_map.lock().await; - let _ = locked_wait_map.remove(key); let _ = sender.send(value.clone()); return Entry { @@ -72,6 +63,34 @@ where }; } + // Note: We previously inserted the waiter before checking self.storage This + // involved dropping the lock and then removing the waiter if we got a cache hit. + // Unfortunately, this raced with the two other removes and it was very possible + // that this would lead to scenarios where a client would end up waiting for a + // broadcast message that would never arrive. This manifests as a hung router for + // the specific failed query. + // The most common cause of this was many clients hitting the router with the same + // query whilst over-loading the router to cause it to fail when processing + // connections. This led to conflict over the waiter map between: + // - the completing First Entry client + // - clients hitting the wait map entry and subscribing + // - sentinel completion removing the wait map entry + // I'm not 100% sure where the exact cause of the problem is, but this fairly + // conservative lock strategy is the only fix I've been able to concoct that + // hasn't failed. + // I've tried quite a few versions which create the waiter before accessing + // self.storage and all of them fail by leaving "dead" waiters in place. + + // Don't insert the waiter until *after* we have checked the cache. + + locked_wait_map.insert(key.clone(), sender.clone()); + + // we must not hold a lock over the wait map while we are waiting for a value from the + // delegate. This way, other tasks can come and register interest in the same key, or + // request other keys independently + + drop(locked_wait_map); + let k = key.clone(); // when _drop_signal is dropped, either by getting out of the block, returning // the error from ready_oneshot or by cancellation, the drop_sentinel future will @@ -100,7 +119,7 @@ where self.storage.insert(key, value.clone()).await; } - pub(crate) async fn remove_wait(&self, key: &K) { + async fn remove_wait(&self, key: &K) { let mut locked_wait_map = self.wait_map.lock().await; let _ = locked_wait_map.remove(key); }