Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for missing proto fields in GRPC FunctionScore and Highlight ([#20169](https://github.com/opensearch-project/OpenSearch/pull/20169))

### Fixed
- Fix bug of warm index: FullFileCachedIndexInput was closed error ([#20055](https://github.com/opensearch-project/OpenSearch/pull/20055))
- Fix flaky test ClusterMaxMergesAtOnceIT.testClusterLevelDefaultUpdatesMergePolicy ([#18056](https://github.com/opensearch-project/OpenSearch/issues/18056))
- Fix bug in Assertion framework(Yaml Rest test): numeric comparison fails when comparing Integer vs Long (or Float vs Double) ([#19376](https://github.com/opensearch-project/OpenSearch/pull/19376))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Reference Counted IndexInput. The first FileCachedIndexInput for a file/block is called origin.
Expand Down Expand Up @@ -41,7 +42,7 @@ public class FileCachedIndexInput extends IndexInput implements RandomAccessInpu
/** indicates if this IndexInput instance is a clone or not */
protected final boolean isClone;

protected volatile boolean closed = false;
protected final AtomicBoolean closed = new AtomicBoolean(false);

public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) {
this(cache, filePath, underlyingIndexInput, false);
Expand Down Expand Up @@ -139,7 +140,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw

@Override
public void close() throws IOException {
if (!closed) {
if (!closed.get()) {
// if the underlying lucene index input is a clone,
// the following line won't close/unmap the file.
luceneIndexInput.close();
Expand All @@ -148,7 +149,7 @@ public void close() throws IOException {
if (isClone) {
cache.decRef(filePath);
}
closed = true;
closed.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Extension of {@link FileCachedIndexInput} for full files for handling clones and slices
Expand All @@ -37,7 +38,7 @@ public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput under

public FullFileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput, boolean isClone) {
super(cache, filePath, underlyingIndexInput, isClone);
indexInputHolder = new IndexInputHolder(underlyingIndexInput, isClone, cache, filePath);
indexInputHolder = new IndexInputHolder(closed, underlyingIndexInput, isClone, cache, filePath);
CLEANER.register(this, indexInputHolder);
}

Expand Down Expand Up @@ -83,7 +84,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw
*/
@Override
public void close() throws IOException {
if (!closed) {
if (!closed.get()) {
if (isClone) {
cache.decRef(filePath);
}
Expand All @@ -93,17 +94,26 @@ public void close() throws IOException {
logger.trace("FullFileCachedIndexInput already closed");
}
luceneIndexInput = null;
closed = true;
closed.set(true);
}
}

/**
* Run resource cleaning,To be used only in test
*/
public void indexInputHolderRun() {
indexInputHolder.run();
}

private static class IndexInputHolder implements Runnable {
private final AtomicBoolean closed;
private final IndexInput indexInput;
private final FileCache cache;
private final boolean isClone;
private final Path path;

IndexInputHolder(IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
IndexInputHolder(AtomicBoolean closed, IndexInput indexInput, boolean isClone, FileCache cache, Path path) {
this.closed = closed;
this.indexInput = indexInput;
this.isClone = isClone;
this.cache = cache;
Expand All @@ -113,8 +123,11 @@ private static class IndexInputHolder implements Runnable {
@Override
public void run() {
try {
indexInput.close();
if (isClone) cache.decRef(path);
if (!closed.get()) {
indexInput.close();
if (isClone) cache.decRef(path);
closed.set(true);
}
} catch (IOException e) {
logger.error("Failed to close IndexInput while clearing phantom reachable object");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,40 @@ public void testSlice() throws IOException {
assertFalse(isActiveAndTotalUsageSame());
}

public void testClose() throws IOException {
setupIndexInputAndAddToFileCache();

// Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same
assertTrue(isActiveAndTotalUsageSame());

fileCache.decRef(filePath);

// 3 Clones
FullFileCachedIndexInput indexInputClone1 = fullFileCachedIndexInput.clone();
FullFileCachedIndexInput indexInputClone2 = fullFileCachedIndexInput.clone();
FullFileCachedIndexInput indexInputClone3 = fullFileCachedIndexInput.clone();

assertEquals((int) fileCache.getRef(filePath), 3);
// Close Clone1, refCount -1
indexInputClone1.close();
assertEquals((int) fileCache.getRef(filePath), 2);
// Mock GC resource cleaning, but the deRef function will not be called again.
indexInputClone1.indexInputHolderRun();
assertEquals((int) fileCache.getRef(filePath), 2);

// Mock GC resource cleaning, refCount -1
indexInputClone2.indexInputHolderRun();
assertEquals((int) fileCache.getRef(filePath), 1);
// Close Clone2, but the deRef function will not be called again.
indexInputClone2.close();
assertEquals((int) fileCache.getRef(filePath), 1);

indexInputClone3.close();
assertEquals((int) fileCache.getRef(filePath), 0);
indexInputClone3.indexInputHolderRun();
assertEquals((int) fileCache.getRef(filePath), 0);
}

private void triggerGarbageCollectionAndAssertClonesClosed() {
try {
// Clones/Slices will be phantom reachable now, triggering gc should call close on them
Expand Down
Loading