Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,42 @@ where
None => {
let (sender, _receiver) = broadcast::channel(1);

locked_wait_map.insert(key.clone(), sender.clone());

// we must not hold a lock over the wait map while we are waiting for a value from the
// cache. This way, other tasks can come and register interest in the same key, or
// request other keys independently
drop(locked_wait_map);

if let Some(value) = self.storage.get(key).await {
let mut locked_wait_map = self.wait_map.lock().await;
let _ = locked_wait_map.remove(key);
let _ = sender.send(value.clone());

return Entry {
inner: EntryInner::Value(value),
};
}

// Note: We previously inserted the waiter before checking self.storage This
// involved dropping the lock and then removing the waiter if we got a cache hit.
// Unfortunately, this raced with the two other removes and it was very possible
// that this would lead to scenarios where a client would end up waiting for a
// broadcast message that would never arrive. This manifests as a hung router for
// the specific failed query.
// The most common cause of this was many clients hitting the router with the same
// query whilst over-loading the router to cause it to fail when processing
// connections. This led to conflict over the waiter map between:
// - the completing First Entry client
// - clients hitting the wait map entry and subscribing
// - sentinel completion removing the wait map entry
// I'm not 100% sure where the exact cause of the problem is, but this fairly
// conservative lock strategy is the only fix I've been able to concoct that
// hasn't failed.
// I've tried quite a few versions which create the waiter before accessing
// self.storage and all of them fail by leaving "dead" waiters in place.

// Don't insert the waiter until *after* we have checked the cache.

locked_wait_map.insert(key.clone(), sender.clone());

// we must not hold a lock over the wait map while we are waiting for a value from the
// delegate. This way, other tasks can come and register interest in the same key, or
// request other keys independently

drop(locked_wait_map);

let k = key.clone();
// when _drop_signal is dropped, either by getting out of the block, returning
// the error from ready_oneshot or by cancellation, the drop_sentinel future will
Expand Down Expand Up @@ -100,7 +119,7 @@ where
self.storage.insert(key, value.clone()).await;
}

pub(crate) async fn remove_wait(&self, key: &K) {
async fn remove_wait(&self, key: &K) {
let mut locked_wait_map = self.wait_map.lock().await;
let _ = locked_wait_map.remove(key);
}
Expand Down