Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management #13070

Merged
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2911c25
Update IndicesRequestCache.java
kiranprakash154 Apr 3, 2024
05610d3
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 3, 2024
917df93
Update CHANGELOG.md
kiranprakash154 Apr 3, 2024
ac1464f
revert
kiranprakash154 Apr 4, 2024
f2d72f2
revert
kiranprakash154 Apr 4, 2024
148ff74
Update IndicesRequestCache.java
kiranprakash154 Apr 4, 2024
d8451c2
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 4, 2024
ad88936
Update IndicesRequestCache.java
kiranprakash154 Apr 4, 2024
87e5c46
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 4, 2024
6b36ee6
Update IndicesRequestCache.java
kiranprakash154 Apr 8, 2024
bebfc3a
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
4179f46
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 8, 2024
f42ebb8
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
56f4e1f
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
6630cfc
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
5de80d0
Update IndicesRequestCache.java
kiranprakash154 Apr 8, 2024
139cbfc
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 8, 2024
eaeba38
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
6122aa2
Update IndicesRequestCache.java
kiranprakash154 Apr 9, 2024
e870a98
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
f658de6
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 9, 2024
bed1121
Update IndicesRequestCache.java
kiranprakash154 Apr 9, 2024
e178ea7
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 9, 2024
e14a54c
Merge branch 'kp/stalekey-status-check' of https://github.com/kiranpr…
kiranprakash154 Apr 9, 2024
3a2e852
code comments only
kiranprakash154 Apr 10, 2024
5989e1d
docs changes
kiranprakash154 Apr 10, 2024
68b949b
Update CHANGELOG.md
kiranprakash154 Apr 10, 2024
dd91613
revert catching AlreadyClosedException
kiranprakash154 Apr 11, 2024
707f8bd
assert
kiranprakash154 Apr 11, 2024
00dee8d
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 12, 2024
70cc990
conflicts
kiranprakash154 Apr 12, 2024
b6b5f2b
Update IndicesRequestCacheTests.java
kiranprakash154 Apr 12, 2024
4c69187
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
31d9c54
address comments
kiranprakash154 Apr 12, 2024
9aeb82d
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
8bfa6c4
Update IndicesRequestCache.java
kiranprakash154 Apr 12, 2024
e37282f
Merge branch 'main' into kp/stalekey-status-check
kiranprakash154 Apr 23, 2024
d5cdf50
address conflicts
kiranprakash154 Apr 23, 2024
9068227
spotless apply
kiranprakash154 Apr 23, 2024
5a15c87
address comments
kiranprakash154 Apr 23, 2024
430e57c
update code comments
kiranprakash154 Apr 23, 2024
581ea2a
address bug & add tests
kiranprakash154 Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048))
- Client with Java 8 runtime and Apache HttpClient 5 Transport fails with java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer ([#13100](https://github.com/opensearch-project/opensearch-java/pull/13100))
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -210,9 +211,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
// shards as part of request cache.
Key key = notification.getKey();
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
);
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
}

BytesReference getOrCompute(
Expand Down Expand Up @@ -241,10 +241,11 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
} else {
cacheEntity.onHit();
}

return value;
}

Expand Down Expand Up @@ -477,7 +478,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
*
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
return;
}
Expand All @@ -493,8 +494,30 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
/**
* Updates the cleanupKeyToCountMap and staleKeysCount when a cache eviction occurs.
*
* <p>This method is called when an entry is evicted from the cache.
* It decrements the count of the entry in the cleanupKeyToCountMap.
* It also decrements the staleKeysCount only if the entry was accounted.
* If the count of the CleanupKey becomes zero, it removes the CleanupKey from the map.
sohami marked this conversation as resolved.
Show resolved Hide resolved
*
* <p> We update the cleanupKeyToCountMap on every key removed from cache
* except for the keys removed with reason Replaced
*
* @param cleanupKey the CleanupKey that has been evicted from the cache
* @param notification RemovalNotification of the cache entry evicted
*/
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
// does not affect the staleness count, we skip such notifications.
return;
}
if (cleanupKey.entity == null) {
// on shard close, the shard is still lying around so this will only happen when the shard is deleted.
// we would have accounted this in staleKeysCount when the deletion of shard would have closed the associated readers
staleKeysCount.decrementAndGet();
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -504,23 +527,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
// decrement the stale key count
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
// If ShardId is not present or readerCacheKeyId is not present
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
int newValue = currentValue - 1;
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
return newValue == 0 ? null : newValue;
});
return keyCountMap;
// Returning null removes the entry for the shardId, if it exists
return null;
sohami marked this conversation as resolved.
Show resolved Hide resolved
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
// but do not decrement the staleKeysCount
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
// this should never be null
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount <= 0) {
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
// Remove the readerCacheKeyId entry if new count is zero or less
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
} else {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
}
});
}

/**
* Updates the count of stale keys in the cache.
* This method is called when a CleanupKey is added to the keysToClean set.
*
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
*
Expand All @@ -538,7 +579,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
ShardId shardId = indexShard.shardId();

// Using computeIfPresent to atomically operate on the countMap for a given shardId
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
if (cleanupKey.readerCacheKeyId == null) {
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
Expand All @@ -547,18 +588,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
return null;
} else {
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
staleKeysCount.addAndGet(v);
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
staleKeysCount.addAndGet(count);
// Return null to remove the key after updating staleKeysCount
return null;
});

// Check if countMap is empty after removal to decide if we need to remove the shardId entry
if (countMap.isEmpty()) {
return null; // Returning null removes the entry for shardId
// Returning null removes the entry for shardId
return null;
}
}
return countMap; // Return the modified countMap to keep the mapping
// Return the modified countMap to retain updates
return countMap;
});
}

Expand Down Expand Up @@ -673,6 +715,11 @@ public void close() {
this.cacheCleaner.close();
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand Down
Loading
Loading