Skip to content

Commit

Permalink
[Caching] Move cache removal notifications outside lru lock (opensear…
Browse files Browse the repository at this point in the history
…ch-project#14017)

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 authored and Peter Alfonsi committed Sep 3, 2024
1 parent c5fd7fe commit 406ae78
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
53 changes: 46 additions & 7 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ReleasableLock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -394,7 +396,12 @@ private V get(K key, long now, Consumer<Entry<K, V>> onExpiration) {
if (entry == null) {
return null;
} else {
promote(entry, now);
List<RemovalNotification<K, V>> removalNotifications = promote(entry, now).v2();
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return entry.value;
}
}
Expand Down Expand Up @@ -444,8 +451,14 @@ private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
removalNotifications = promote(ok, now).v2();
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return ok.value;
} else {
Expand Down Expand Up @@ -510,16 +523,22 @@ private void put(K key, V value, long now) {
CacheSegment<K, V> segment = getCacheSegment(key);
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
boolean replaced = false;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
promote(tuple.v1(), now);
removalNotifications = promote(tuple.v1(), now).v2();
}
if (replaced) {
removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
}

Expand Down Expand Up @@ -764,8 +783,17 @@ public long getEvictions() {
}
}

private boolean promote(Entry<K, V> entry, long now) {
/**
* Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in
* case the cache size is exceeding or the entry got expired.
* @param entry Entry to be promoted
* @param now the current time
* @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal
* notifications that the callers needs to handle.
*/
private Tuple<Boolean, List<RemovalNotification<K, V>>> promote(Entry<K, V> entry, long now) {
boolean promoted = true;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
switch (entry.state) {
case DELETED:
Expand All @@ -779,10 +807,21 @@ private boolean promote(Entry<K, V> entry, long now) {
break;
}
if (promoted) {
evict(now);
while (tail != null && shouldPrune(tail, now)) {
Entry<K, V> entryToBeRemoved = tail;
CacheSegment<K, V> segment = getCacheSegment(entryToBeRemoved.key);
if (segment != null) {
segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {});
}
if (unlink(entryToBeRemoved)) {
removalNotifications.add(
new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED)
);
}
}
}
}
return promoted;
return new Tuple<>(promoted, removalNotifications);
}

private void evict(long now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@
*/
@FunctionalInterface
public interface RemovalListener<K, V> {

/**
* This may be called from multiple threads at once. So implementation needs to be thread safe.
* @param notification removal notification for desired entry.
*/
void onRemoval(RemovalNotification<K, V> notification);
}

0 comments on commit 406ae78

Please sign in to comment.