diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 7feaeb63ac36f..cc4d30436e492 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -862,13 +862,17 @@ public void forceMerge(boolean flush) throws IOException { public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException; /** - * Snapshots the index and returns a handle to it. If needed will try and "commit" the + * Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the * lucene index to make sure we have a "fresh" copy of the files to snapshot. * - * @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit. * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException; + public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException; + + /** + * Snapshots the most recent safe index commit from the engine. + */ + public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; /** * fail engine due to some error. the engine will also be closed. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 97a6403ec3b23..28965ae287b92 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1687,7 +1687,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu } @Override - public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException { + public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { @@ -1695,8 +1695,14 @@ public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flush(false, true); logger.trace("finish flush for snapshot"); } - final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit); - return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit)); + final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false); + return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit)); + } + + @Override + public IndexCommitRef acquireSafeIndexCommit() throws EngineException { + final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true); + return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit)); } private boolean failOnTragicEvent(AlreadyClosedException ex) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3ace9ededc5b3..5eaba1592e0b9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1083,22 +1083,34 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() { } /** - * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this + * Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this * commit won't be freed until the commit / snapshot is closed. * - * @param safeCommit true capture the most recent safe commit point; otherwise the most recent commit point. * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException { - IndexShardState state = this.state; // one time volatile read + public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException { + final IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { - return getEngine().acquireIndexCommit(safeCommit, flushFirst); + return getEngine().acquireLastIndexCommit(flushFirst); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } } + /** + * Snapshots the most recent safe index commit from the currently running engine. + * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. + */ + public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException { + final IndexShardState state = this.state; // one time volatile read + // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine + if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { + return getEngine().acquireSafeIndexCommit(); + } else { + throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); + } + } /** * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard, @@ -1127,7 +1139,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { return store.getMetadata(null, true); } } - indexCommit = engine.acquireIndexCommit(false, false); + indexCommit = engine.acquireLastIndexCommit(false); return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index f8f92fbb5fa8b..d7105c0c14d38 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable { store.incRef(); boolean success = false; try { - indexCommit = shard.acquireIndexCommit(false, true); + indexCommit = shard.acquireLastIndexCommit(true); success = true; } finally { if (success == false) { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 7aab2c750d139..3220f61622469 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -238,7 +238,7 @@ final void ensureOpen() { * * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard - * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the * directory * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an @@ -262,7 +262,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { * * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard - * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the * directory diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5a0ee1cf44d07..68c90194bb80f 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException { } else { final Engine.IndexCommitRef phase1Snapshot; try { - phase1Snapshot = shard.acquireIndexCommit(true, false); + phase1Snapshot = shard.acquireSafeIndexCommit(); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 4c3d58e67ff72..c8f830c461129 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -176,7 +176,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long /** * Creates a snapshot of the shard based on the index commit point. *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method. + * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method. * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7e2a7aab27743..df955c2e3b63d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -390,7 +390,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); try { // we flush first to make sure we get the latest writes snapshotted - try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) { + try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2a7e49aa66b61..86f6e1b92dff6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2118,7 +2118,7 @@ public void testConcurrentWritesAndCommits() throws Exception { boolean doneIndexing; do { doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS); - commits.add(engine.acquireIndexCommit(false, true)); + commits.add(engine.acquireLastIndexCommit(true)); if (commits.size() > commitLimit) { // don't keep on piling up too many commits IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1))); // we increase the wait time to make sure we eventually if things are slow wait for threads to finish. @@ -4343,7 +4343,12 @@ public void testAcquireIndexCommit() throws Exception { } final boolean flushFirst = randomBoolean(); final boolean safeCommit = randomBoolean(); - Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst); + final Engine.IndexCommitRef snapshot; + if (safeCommit) { + snapshot = engine.acquireSafeIndexCommit(); + } else { + snapshot = engine.acquireLastIndexCommit(flushFirst); + } int moreDocs = between(1, 20); for (int i = 0; i < moreDocs; i++) { index(engine, numDocs + i); @@ -4351,11 +4356,11 @@ public void testAcquireIndexCommit() throws Exception { globalCheckpoint.set(numDocs + moreDocs - 1); engine.flush(); // check that we can still read the commit that we captured - try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) { + try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) { assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0)); } assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); - commit.close(); + snapshot.close(); // check it's clean up engine.flush(true, true); assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 7ab6925ce57b9..2e4a9a65d79b6 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -396,7 +396,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.state()).thenReturn(IndexShardState.RELOCATED); - when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class)); doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index cd55c1126eb1c..f709222ca589c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -620,7 +620,7 @@ protected void snapshotShard(final IndexShard shard, final Snapshot snapshot, final Repository repository) throws IOException { final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); - try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) { + try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID());