Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ By [@bnjjj](https://github.com/bnjjj) in https://github.com/apollographql/router

## 🐛 Fixes

### Fix the deduplication logic in deduplication caching [Issue #1984](https://github.com/apollographql/router/issues/1984))

Under load, it is possible to break the router deduplication logic and leave orphaned entries in the waiter map. This fixes the logic to prevent this from occurring.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/2014

### Follow directives from Uplink ([Issue #1494](https://github.com/apollographql/router/issues/1494) [Issue #1539](https://github.com/apollographql/router/issues/1539))

The Uplink API returns actionable info in its responses:
Expand Down
39 changes: 19 additions & 20 deletions apollo-router/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ where
None => {
let (sender, _receiver) = broadcast::channel(1);

let k = key.clone();
// when _drop_signal is dropped, either by getting out of the block, returning
// the error from ready_oneshot or by cancellation, the drop_sentinel future will
// return with Err(), then we remove the entry from the wait map
let (_drop_signal, drop_sentinel) = oneshot::channel::<()>();
let wait_map = self.wait_map.clone();
tokio::task::spawn(async move {
let _ = drop_sentinel.await;
let mut locked_wait_map = wait_map.lock().await;
let _ = locked_wait_map.remove(&k);
});

locked_wait_map.insert(key.clone(), sender.clone());

// we must not hold a lock over the wait map while we are waiting for a value from the
Expand All @@ -63,27 +75,13 @@ where
drop(locked_wait_map);

if let Some(value) = self.storage.get(key).await {
let mut locked_wait_map = self.wait_map.lock().await;
let _ = locked_wait_map.remove(key);
let _ = sender.send(value.clone());
self.send(sender, key, value.clone()).await;

return Entry {
inner: EntryInner::Value(value),
};
}

let k = key.clone();
// when _drop_signal is dropped, either by getting out of the block, returning
// the error from ready_oneshot or by cancellation, the drop_sentinel future will
// return with Err(), then we remove the entry from the wait map
let (_drop_signal, drop_sentinel) = oneshot::channel::<()>();
let wait_map = self.wait_map.clone();
tokio::task::spawn(async move {
let _ = drop_sentinel.await;
let mut locked_wait_map = wait_map.lock().await;
let _ = locked_wait_map.remove(&k);
});

Entry {
inner: EntryInner::First {
sender,
Expand All @@ -100,9 +98,12 @@ where
self.storage.insert(key, value.clone()).await;
}

pub(crate) async fn remove_wait(&self, key: &K) {
async fn send(&self, sender: broadcast::Sender<V>, key: &K, value: V) {
// Lock the wait map to prevent more subscribers racing with our send
// notification
let mut locked_wait_map = self.wait_map.lock().await;
let _ = locked_wait_map.remove(key);
let _ = sender.send(value);
}
}

Expand Down Expand Up @@ -157,8 +158,7 @@ where
} = self.inner
{
cache.insert(key.clone(), value.clone()).await;
cache.remove_wait(&key).await;
let _ = sender.send(value);
cache.send(sender, &key, value).await;
}
}

Expand All @@ -169,8 +169,7 @@ where
sender, cache, key, ..
} = self.inner
{
let _ = sender.send(value);
cache.remove_wait(&key).await;
cache.send(sender, &key, value).await;
}
}
}
Expand Down