-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Tighten up concurrent store metadata listing and engine writes #19684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
dbe0f01
5eca202
ae49df4
f4e2122
c1981e9
609bb3b
6c694f5
b5f75b4
852b5ac
7bb6085
a055615
ebf5231
ff9df4c
18dad67
66bc91d
a4b275d
dec2a26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,14 +21,19 @@ | |
|
|
||
| import org.apache.lucene.codecs.PostingsFormat; | ||
| import org.apache.lucene.index.CheckIndex; | ||
| import org.apache.lucene.index.CorruptIndexException; | ||
| import org.apache.lucene.index.IndexCommit; | ||
| import org.apache.lucene.index.IndexFormatTooNewException; | ||
| import org.apache.lucene.index.IndexFormatTooOldException; | ||
| import org.apache.lucene.index.IndexWriter; | ||
| import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; | ||
| import org.apache.lucene.index.SnapshotDeletionPolicy; | ||
| import org.apache.lucene.index.Term; | ||
| import org.apache.lucene.search.Query; | ||
| import org.apache.lucene.search.QueryCachingPolicy; | ||
| import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; | ||
| import org.apache.lucene.store.AlreadyClosedException; | ||
| import org.apache.lucene.store.Lock; | ||
| import org.apache.lucene.util.IOUtils; | ||
| import org.apache.lucene.util.ThreadInterruptedException; | ||
| import org.elasticsearch.ElasticsearchException; | ||
|
|
@@ -116,10 +121,12 @@ | |
| import org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| import java.io.FileNotFoundException; | ||
| import java.io.IOException; | ||
| import java.io.PrintStream; | ||
| import java.nio.channels.ClosedByInterruptException; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.nio.file.NoSuchFileException; | ||
| import java.util.ArrayList; | ||
| import java.util.EnumSet; | ||
| import java.util.List; | ||
|
|
@@ -789,29 +796,69 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { | |
|
|
||
| /** | ||
| * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this | ||
| * commit won't be freed until the commit / snapshot is released via {@link #releaseSnapshot(IndexCommit)}. | ||
| * commit won't be freed until the commit / snapshot is released via {@link #releaseIndexCommit(IndexCommit)}. | ||
| * | ||
| * @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed | ||
| */ | ||
| public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException { | ||
| public IndexCommit acquireIndexCommit(boolean flushFirst) throws EngineException { | ||
| IndexShardState state = this.state; // one time volatile read | ||
| // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine | ||
| if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { | ||
| return getEngine().snapshotIndex(flushFirst); | ||
| return getEngine().acquireIndexCommit(flushFirst); | ||
| } else { | ||
| throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Releases a snapshot taken from {@link #snapshotIndex(boolean)} this must be called to release the resources | ||
| * Releases a snapshot taken from {@link #acquireIndexCommit(boolean)} this must be called to release the resources | ||
| * referenced by the given snapshot {@link IndexCommit}. | ||
| */ | ||
| public void releaseSnapshot(IndexCommit snapshot) throws IOException { | ||
| public void releaseIndexCommit(IndexCommit snapshot) throws IOException { | ||
| deletionPolicy.release(snapshot); | ||
| } | ||
|
|
||
| /** | ||
| * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, | ||
| * without having to worry about the current state of the engine and concurrent flushes. | ||
| * | ||
| * @throws org.apache.lucene.index.IndexNotFoundException if no index is found in the current directory | ||
| * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an | ||
| * unexpected exception when opening the index reading the segments file. | ||
| * @throws IndexFormatTooOldException if the lucene index is too old to be opened. | ||
| * @throws IndexFormatTooNewException if the lucene index is too new to be opened. | ||
| * @throws FileNotFoundException if one or more files referenced by a commit are not present. | ||
| * @throws NoSuchFileException if one or more files referenced by a commit are not present. | ||
| */ | ||
| public Store.MetadataSnapshot snapshotStore() throws IOException { | ||
| synchronized (mutex) { | ||
| // if the engine is not running, we can access the store directly, but we need to make sure no one starts | ||
| // the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized. | ||
| // That can be done out of mutex, since the engine can be closed half way. | ||
| Engine engine = getEngineOrNull(); | ||
| if (engine == null) { | ||
| store.incRef(); | ||
|
||
| try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) { | ||
|
||
| return store.getMetadata(null); | ||
| } finally { | ||
| store.decRef(); | ||
| } | ||
| } | ||
| } | ||
| IndexCommit indexCommit = null; | ||
| store.incRef(); | ||
| try { | ||
| indexCommit = deletionPolicy.snapshot(); | ||
| return store.getMetadata(indexCommit); | ||
| } finally { | ||
| store.decRef(); | ||
| if (indexCommit != null) { | ||
| deletionPolicy.release(indexCommit); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Fails the shard and marks the shard store as corrupted if | ||
| * <code>e</code> is caused by index corruption | ||
|
|
@@ -1310,7 +1357,7 @@ private void doCheckIndex() throws IOException { | |
| if ("checksum".equals(checkIndexOnStartup)) { | ||
| // physical verification only: verify all checksums for the latest commit | ||
| IOException corrupt = null; | ||
| MetadataSnapshot metadata = store.getMetadata(); | ||
| MetadataSnapshot metadata = snapshotStore(); | ||
| for (Map.Entry<String, StoreFileMetaData> entry : metadata.asMap().entrySet()) { | ||
| try { | ||
| Store.checkIntegrity(entry.getValue(), store.directory()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm but what happens if the engine is closed just before, or while, we call
deletionPolicy.snapshot()below?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the deletion policy is safe to use then, but I agree this is all very icky, but at least it's in one place now so we can improve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK indeed I looked at
SnapshotDeletionPolicyand it looks OK if you pull a snapshot, and then IW closes (you get the last commit before IW closed, and any files it references will remain existing until the next IW is created), or if IW closes and you pull a snapshot (you get whatever IW committed on close). And its methods are sync'd.