Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -663,7 +664,15 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}
Releasable releasable = store::decRef;
try {
EngineSearcher engineSearcher = new EngineSearcher(source, getReferenceManager(scope), store, logger);
ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
Searcher engineSearcher = new Searcher(source, referenceManager.acquire(),
s -> {
try {
referenceManager.release(s);
} finally {
store.decRef();
}
}, logger);
releasable = null; // success - hand over the reference to the engine searcher
return engineSearcher;
} catch (AlreadyClosedException ex) {
Expand Down Expand Up @@ -1167,40 +1176,67 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}

public static class Searcher implements Releasable {

private final String source;
private final IndexSearcher searcher;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Logger logger;
private final IOUtils.IOConsumer<IndexSearcher> onClose;

public Searcher(String source, IndexSearcher searcher, Logger logger) {
this(source, searcher, s -> s.getIndexReader().close(), logger);
}

public Searcher(String source, IndexSearcher searcher) {
public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer<IndexSearcher> onClose, Logger logger) {
this.source = source;
this.searcher = searcher;
this.onClose = onClose;
this.logger = logger;
}

/**
* The source that caused this searcher to be acquired.
*/
public String source() {
public final String source() {
return source;
}

public IndexReader reader() {
public final IndexReader reader() {
return searcher.getIndexReader();
}

public DirectoryReader getDirectoryReader() {
public final DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}

public IndexSearcher searcher() {
public final IndexSearcher searcher() {
return searcher;
}

@Override
public void close() {
// Nothing to close here
if (!released.compareAndSet(false, true)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG == false! 😱

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can push a fix for this oversight directly, dismiss my disapproving review, and merge. 😇

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit with an appropriate commit message

/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
return;
}
try {
onClose.accept(searcher());
} catch (IOException e) {
throw new IllegalStateException("Cannot close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
}
}

public final Logger getLogger() {
return logger;
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;

import java.io.IOException;
Expand Down Expand Up @@ -97,21 +97,10 @@ public final Engine.Searcher wrap(Engine.Searcher engineSearcher) throws IOExcep
if (reader == nonClosingReaderWrapper && indexSearcher == innerIndexSearcher) {
return engineSearcher;
} else {
return new Engine.Searcher(engineSearcher.source(), indexSearcher) {
@Override
public void close() throws ElasticsearchException {
try {
reader().close();
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
} catch (IOException e) {
throw new ElasticsearchException("failed to close reader", e);
} finally {
engineSearcher.close();
}

}
};
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher),
engineSearcher.getLogger());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ class AssertingSearcher extends Engine.Searcher {
private final Logger logger;
private final AtomicBoolean closed = new AtomicBoolean(false);

AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher,
ShardId shardId,
Logger logger) {
super(wrappedSearcher.source(), indexSearcher);
AssertingSearcher(IndexSearcher indexSearcher, final Engine.Searcher wrappedSearcher, ShardId shardId, Logger logger) {
super(wrappedSearcher.source(), indexSearcher, s -> {throw new AssertionError();}, logger);
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
// with a wrapped reader.
this.wrappedSearcher = wrappedSearcher;
Expand All @@ -52,11 +50,6 @@ class AssertingSearcher extends Engine.Searcher {
"IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
}

@Override
public String source() {
return wrappedSearcher.source();
}

@Override
public void close() {
synchronized (lock) {
Expand Down