diff --git a/CHANGELOG.md b/CHANGELOG.md index 234f0e6570fbf..9c63823d5d315 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for missing proto fields in GRPC FunctionScore and Highlight ([#20169](https://github.com/opensearch-project/OpenSearch/pull/20169)) ### Fixed +- Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055)) - Fix flaky test ClusterMaxMergesAtOnceIT.testClusterLevelDefaultUpdatesMergePolicy ([#18056](https://github.com/opensearch-project/OpenSearch/issues/18056)) - Fix bug in Assertion framework(Yaml Rest test): numeric comparison fails when comparing Integer vs Long (or Float vs Double) ([#19376](https://github.com/opensearch-project/OpenSearch/pull/19376)) diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java index ab6f5f931da0f..fba8dacc1e99e 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCachedIndexInput.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; /** * Reference Counted IndexInput. The first FileCachedIndexInput for a file/block is called origin. @@ -41,7 +42,7 @@ public class FileCachedIndexInput extends IndexInput implements RandomAccessInpu /** indicates if this IndexInput instance is a clone or not */ protected final boolean isClone; - protected volatile boolean closed = false; + protected final AtomicBoolean closed = new AtomicBoolean(false); public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) { this(cache, filePath, underlyingIndexInput, false); @@ -139,7 +140,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw @Override public void close() throws IOException { - if (!closed) { + if (!closed.get()) { // if the underlying lucene index input is a clone, // the following line won't close/unmap the file. luceneIndexInput.close(); @@ -148,7 +149,7 @@ public void close() throws IOException { if (isClone) { cache.decRef(filePath); } - closed = true; + closed.set(true); } } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java index deb8f437bfd63..78ef14b5a9698 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInput.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.lang.ref.Cleaner; import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; /** * Extension of {@link FileCachedIndexInput} for full files for handling clones and slices @@ -37,7 +38,7 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) { super(cache, filePath, underlyingIndexInput, isClone); - indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath); + indexInputHolder = new IndexInputHolder(closed, underlyingIndexInput, isClone, cache, filePath); CLEANER.register(this, indexInputHolder); } @@ -83,7 +84,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw */ @Override public void close() throws IOException { - if (!closed) { + if (!closed.get()) { if (isClone) { cache.decRef(filePath); } @@ -93,17 +94,26 @@ public void close() throws IOException { logger.trace("FullFileCachedIndexInput already closed"); } luceneIndexInput = null; - closed = true; + closed.set(true); } } + /** + * Run resource cleaning,To be used only in test + */ + public void indexInputHolderRun() { + indexInputHolder.run(); + } + private static class IndexInputHolder implements Runnable { + private final AtomicBoolean closed; private final IndexInput indexInput; private final FileCache cache; private final boolean isClone; private final Path path; - IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) { + IndexInputHolder(AtomicBoolean closed, IndexInput indexInput, boolean isClone, FileCache cache, Path path) { + this.closed = closed; this.indexInput = indexInput; this.isClone = isClone; this.cache = cache; @@ -113,8 +123,11 @@ private static class IndexInputHolder implements Runnable { @Override public void run() { try { - indexInput.close(); - if (isClone) cache.decRef(path); + if (!closed.get()) { + indexInput.close(); + if (isClone) cache.decRef(path); + closed.set(true); + } } catch (IOException e) { logger.error("Failed to close IndexInput while clearing phantom reachable object"); } diff --git a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java index bc646cc8d50db..cfeb886a5df8a 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/filecache/FullFileCachedIndexInputTests.java @@ -78,6 +78,40 @@ public void testSlice() throws IOException { assertFalse(isActiveAndTotalUsageSame()); } + public void testClose() throws IOException { + setupIndexInputAndAddToFileCache(); + + // Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same + assertTrue(isActiveAndTotalUsageSame()); + + fileCache.decRef(filePath); + + // 3 Clones + FullFileCachedIndexInput indexInputClone1 = fullFileCachedIndexInput.clone(); + FullFileCachedIndexInput indexInputClone2 = fullFileCachedIndexInput.clone(); + FullFileCachedIndexInput indexInputClone3 = fullFileCachedIndexInput.clone(); + + assertEquals((int) fileCache.getRef(filePath), 3); + // Close Clone1, refCount -1 + indexInputClone1.close(); + assertEquals((int) fileCache.getRef(filePath), 2); + // Mock GC resource cleaning, but the deRef function will not be called again. + indexInputClone1.indexInputHolderRun(); + assertEquals((int) fileCache.getRef(filePath), 2); + + // Mock GC resource cleaning, refCount -1 + indexInputClone2.indexInputHolderRun(); + assertEquals((int) fileCache.getRef(filePath), 1); + // Close Clone2, but the deRef function will not be called again. + indexInputClone2.close(); + assertEquals((int) fileCache.getRef(filePath), 1); + + indexInputClone3.close(); + assertEquals((int) fileCache.getRef(filePath), 0); + indexInputClone3.indexInputHolderRun(); + assertEquals((int) fileCache.getRef(filePath), 0); + } + private void triggerGarbageCollectionAndAssertClonesClosed() { try { // Clones/Slices will be phantom reachable now, triggering gc should call close on them