From bf175e77fa00d8b6c8ab498d4046eaca1e51bf2d Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Tue, 1 Jul 2025 02:02:03 +0530 Subject: [PATCH 1/5] Fix fetchBlob in TransferManager to compute from file cache only if entry is not present Signed-off-by: Shreyansh Ray --- .../store/remote/utils/TransferManager.java | 18 ++++------ .../remote/utils/TransferManagerTestCase.java | 34 +++++++++++++++---- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index d7f484bb26a79..4e421dea96d95 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -74,17 +74,13 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { - logger.trace("Transfer Manager - IndexInput closed or not in cache"); - // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - } else { - logger.trace("Transfer Manager - Already in cache"); - // already in the cache and ready to be used (open) - return cachedIndexInput; - } - }); + CachedIndexInput cacheEntry = fileCache.get(key); + if (cacheEntry == null || cacheEntry.isClosed()) { + cacheEntry = fileCache.compute( + key, + (path, cachedIndexInput) -> new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest) + ); + } // Cache entry was either retrieved from the cache or newly added, either // way the reference count has been incremented by one. We can only diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 668eac51b1b81..7d6bcd4b58363 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -13,8 +13,8 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.SimpleFSLockFactory; -import org.opensearch.core.common.breaker.CircuitBreaker; -import org.opensearch.core.common.breaker.NoopCircuitBreaker; +import org.opensearch.common.breaker.TestCircuitBreaker; +import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -38,11 +38,8 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public abstract class TransferManagerTestCase extends OpenSearchTestCase { protected static final int EIGHT_MB = 1024 * 1024 * 8; - protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( - EIGHT_MB * 2, - 1, - new NoopCircuitBreaker(CircuitBreaker.REQUEST) - ); + protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); + protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker); protected MMapDirectory directory; protected TransferManager transferManager; @@ -156,6 +153,29 @@ public void testDownloadFails() throws Exception { MatcherAssert.assertThat(fileCache.usage(), equalTo(0L)); } + public void testCircuitBreakerWhileDownloading() throws IOException { + // fetch blob when circuit breaking is not tripping + try (IndexInput i = fetchBlobWithName("1")) { + assertIndexInputIsFunctional(i); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); + } + // should have entry in file cache + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L)); + MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB)); + + // start tripping the circuit breaker + testCircuitBreaker.startBreaking(); + + // fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions + try (IndexInput i = fetchBlobWithName("1")) { + assertIndexInputIsFunctional(i); + MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); + } + + // fetch new blob - should encounter circuit breaking exception + expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2")); + } + public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception { // Mock a call for a blob that will block until the latch is released, // then start the fetch for that blob on a separate thread From c4f8e869e5e3ca9e789323f74a6180e9a71e4c4f Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Thu, 3 Jul 2025 15:10:39 +0530 Subject: [PATCH 2/5] Change compute to put to prevent unnecessary cache misses Signed-off-by: Shreyansh Ray --- .../opensearch/index/store/remote/utils/TransferManager.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 4e421dea96d95..c6d2dc5e6c1c4 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -76,10 +76,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { CachedIndexInput cacheEntry = fileCache.get(key); if (cacheEntry == null || cacheEntry.isClosed()) { - cacheEntry = fileCache.compute( - key, - (path, cachedIndexInput) -> new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest) - ); + cacheEntry = fileCache.put(key, new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest)); } // Cache entry was either retrieved from the cache or newly added, either From e280fe6513c761914a6272264aa9c5e3d6a7222a Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Thu, 3 Jul 2025 17:43:42 +0530 Subject: [PATCH 3/5] Fix test failures Signed-off-by: Shreyansh Ray --- .../opensearch/index/store/remote/utils/TransferManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index c6d2dc5e6c1c4..b07573b6f262d 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -76,7 +76,8 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { CachedIndexInput cacheEntry = fileCache.get(key); if (cacheEntry == null || cacheEntry.isClosed()) { - cacheEntry = fileCache.put(key, new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest)); + cacheEntry = new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + fileCache.put(key, cacheEntry); } // Cache entry was either retrieved from the cache or newly added, either From 92810797e5c55522901beaa167831b008f1f2cd2 Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Tue, 15 Jul 2025 14:48:05 +0530 Subject: [PATCH 4/5] Check circuit breaker before put/compute operations in FileCache Signed-off-by: Shreyansh Ray --- .../store/remote/filecache/FileCache.java | 8 +++----- .../store/remote/filecache/FileCacheTests.java | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java index 718f79a74f9d7..b8b1e3c650b1b 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java @@ -69,8 +69,8 @@ public long capacity() { @Override public CachedIndexInput put(Path filePath, CachedIndexInput indexInput) { + checkParentBreaker(); CachedIndexInput cachedIndexInput = theCache.put(filePath, indexInput); - checkParentBreaker(filePath); return cachedIndexInput; } @@ -79,8 +79,8 @@ public CachedIndexInput compute( Path key, BiFunction remappingFunction ) { + checkParentBreaker(); CachedIndexInput cachedIndexInput = theCache.compute(key, remappingFunction); - checkParentBreaker(key); return cachedIndexInput; } @@ -201,13 +201,11 @@ public void closeIndexInputReferences() { /** * Ensures that the PARENT breaker is not tripped when an entry is added to the cache - * @param filePath the path key for which entry is added */ - private void checkParentBreaker(Path filePath) { + private void checkParentBreaker() { try { circuitBreaker.addEstimateBytesAndMaybeBreak(0, "filecache_entry"); } catch (CircuitBreakingException ex) { - theCache.remove(filePath); throw new CircuitBreakingException( "Unable to create file cache entries", ex.getBytesWanted(), diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java index 643caa85b5862..861b4c009fc87 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java @@ -46,6 +46,10 @@ private FileCache createFileCache(long capacity) { return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, new NoopCircuitBreaker(CircuitBreaker.REQUEST)); } + private FileCache createFileCache(long capacity, CircuitBreaker circuitBreaker) { + return FileCacheFactory.createConcurrentLRUFileCache(capacity, CONCURRENCY_LEVEL, circuitBreaker); + } + private FileCache createCircuitBreakingFileCache(long capacity) { TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); testCircuitBreaker.startBreaking(); @@ -200,6 +204,20 @@ public void testComputeThrowCircuitBreakingException() { assertNull(fileCache.get(path)); } + public void testEntryNotRemovedCircuitBreaker() { + TestCircuitBreaker circuitBreaker = new TestCircuitBreaker(); + FileCache fileCache = createFileCache(MEGA_BYTES, circuitBreaker); + Path path = createPath("0"); + fileCache.put(path, new StubCachedIndexInput(8 * MEGA_BYTES)); + // put should succeed since circuit breaker hasn't tripped yet + assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES); + circuitBreaker.startBreaking(); + // compute should throw CircuitBreakingException but shouldn't remove entry already present + assertThrows(CircuitBreakingException.class, () -> fileCache.compute(path, (p, i) -> new StubCachedIndexInput(2 * MEGA_BYTES))); + assertNotNull(fileCache.get(path)); + assertEquals(fileCache.get(path).length(), 8 * MEGA_BYTES); + } + public void testRemove() { FileCache fileCache = createFileCache(MEGA_BYTES); for (int i = 0; i < 4; i++) { From c071c2eec8d043c03f00b51cbb6d004a133da3b4 Mon Sep 17 00:00:00 2001 From: Shreyansh Ray Date: Tue, 15 Jul 2025 14:51:02 +0530 Subject: [PATCH 5/5] Revert "Fix test failures" This reverts commit e280fe6513c761914a6272264aa9c5e3d6a7222a. Revert "Change compute to put to prevent unnecessary cache misses" This reverts commit c4f8e869e5e3ca9e789323f74a6180e9a71e4c4f. Revert "Fix fetchBlob in TransferManager to compute from file cache only if entry is not present" This reverts commit bf175e77fa00d8b6c8ab498d4046eaca1e51bf2d. Signed-off-by: Shreyansh Ray --- .../store/remote/utils/TransferManager.java | 16 ++++++--- .../remote/utils/TransferManagerTestCase.java | 34 ++++--------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index b07573b6f262d..d7f484bb26a79 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -74,11 +74,17 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio try { return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { - CachedIndexInput cacheEntry = fileCache.get(key); - if (cacheEntry == null || cacheEntry.isClosed()) { - cacheEntry = new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - fileCache.put(key, cacheEntry); - } + CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { + if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); + // Doesn't exist or is closed, either way create a new one + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + } else { + logger.trace("Transfer Manager - Already in cache"); + // already in the cache and ready to be used (open) + return cachedIndexInput; + } + }); // Cache entry was either retrieved from the cache or newly added, either // way the reference count has been incremented by one. We can only diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java index 7d6bcd4b58363..668eac51b1b81 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java @@ -13,8 +13,8 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.SimpleFSLockFactory; -import org.opensearch.common.breaker.TestCircuitBreaker; -import org.opensearch.core.common.breaker.CircuitBreakingException; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -38,8 +38,11 @@ @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public abstract class TransferManagerTestCase extends OpenSearchTestCase { protected static final int EIGHT_MB = 1024 * 1024 * 8; - protected final TestCircuitBreaker testCircuitBreaker = new TestCircuitBreaker(); - protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(EIGHT_MB * 2, 1, testCircuitBreaker); + protected final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache( + EIGHT_MB * 2, + 1, + new NoopCircuitBreaker(CircuitBreaker.REQUEST) + ); protected MMapDirectory directory; protected TransferManager transferManager; @@ -153,29 +156,6 @@ public void testDownloadFails() throws Exception { MatcherAssert.assertThat(fileCache.usage(), equalTo(0L)); } - public void testCircuitBreakerWhileDownloading() throws IOException { - // fetch blob when circuit breaking is not tripping - try (IndexInput i = fetchBlobWithName("1")) { - assertIndexInputIsFunctional(i); - MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); - } - // should have entry in file cache - MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L)); - MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB)); - - // start tripping the circuit breaker - testCircuitBreaker.startBreaking(); - - // fetch blob which already had entry in file cache, should not encounter circuit breaking exceptions - try (IndexInput i = fetchBlobWithName("1")) { - assertIndexInputIsFunctional(i); - MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB)); - } - - // fetch new blob - should encounter circuit breaking exception - expectThrows(CircuitBreakingException.class, () -> fetchBlobWithName("2")); - } - public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception { // Mock a call for a blob that will block until the latch is released, // then start the fetch for that blob on a separate thread