Skip to content

Commit 5c50f5f

Browse files
committed
Separate acquiring safe commit and last commit (#28271)
Previously we introduced a new parameter to `acquireIndexCommit` to allow acquire either a safe commit or a last commit. However with the new parameters, callers can provide a nonsense combination - flush first but acquire the safe commit. This commit separates acquireIndexCommit method into two different methods to avoid that problem. Moreover, this change should also improve the readability. Relates #28038
1 parent f05c6be commit 5c50f5f

File tree

11 files changed

+51
-24
lines changed

11 files changed

+51
-24
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -868,13 +868,17 @@ public void forceMerge(boolean flush) throws IOException {
868868
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
869869

870870
/**
871-
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
871+
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
872872
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
873873
*
874-
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
875874
* @param flushFirst indicates whether the engine should flush before returning the snapshot
876875
*/
877-
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
876+
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;
877+
878+
/**
879+
* Snapshots the most recent safe index commit from the engine.
880+
*/
881+
public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;
878882

879883
/**
880884
* fail engine due to some error. the engine will also be closed.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,16 +1776,22 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
17761776
}
17771777

17781778
@Override
1779-
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
1779+
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
17801780
// we have to flush outside of the readlock otherwise we might have a problem upgrading
17811781
// the to a write lock when we fail the engine in this operation
17821782
if (flushFirst) {
17831783
logger.trace("start flush for snapshot");
17841784
flush(false, true);
17851785
logger.trace("finish flush for snapshot");
17861786
}
1787-
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
1788-
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
1787+
final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
1788+
return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
1789+
}
1790+
1791+
@Override
1792+
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
1793+
final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
1794+
return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
17891795
}
17901796

17911797
private boolean failOnTragicEvent(AlreadyClosedException ex) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,22 +1093,34 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
10931093
}
10941094

10951095
/**
1096-
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
1096+
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
10971097
* commit won't be freed until the commit / snapshot is closed.
10981098
*
1099-
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
11001099
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
11011100
*/
1102-
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
1103-
IndexShardState state = this.state; // one time volatile read
1101+
public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
1102+
final IndexShardState state = this.state; // one time volatile read
11041103
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
11051104
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
1106-
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
1105+
return getEngine().acquireLastIndexCommit(flushFirst);
11071106
} else {
11081107
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
11091108
}
11101109
}
11111110

1111+
/**
1112+
* Snapshots the most recent safe index commit from the currently running engine.
1113+
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
1114+
*/
1115+
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
1116+
final IndexShardState state = this.state; // one time volatile read
1117+
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
1118+
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
1119+
return getEngine().acquireSafeIndexCommit();
1120+
} else {
1121+
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
1122+
}
1123+
}
11121124

11131125
/**
11141126
* gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
@@ -1137,7 +1149,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
11371149
return store.getMetadata(null, true);
11381150
}
11391151
}
1140-
indexCommit = engine.acquireIndexCommit(false, false);
1152+
indexCommit = engine.acquireLastIndexCommit(false);
11411153
return store.getMetadata(indexCommit.getIndexCommit());
11421154
} finally {
11431155
store.decRef();

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
4848
store.incRef();
4949
boolean success = false;
5050
try {
51-
indexCommit = shard.acquireIndexCommit(false, true);
51+
indexCommit = shard.acquireLastIndexCommit(true);
5252
success = true;
5353
} finally {
5454
if (success == false) {

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ final void ensureOpen() {
238238
*
239239
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
240240
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
241-
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
241+
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
242242
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
243243
* directory
244244
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
@@ -262,7 +262,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
262262
*
263263
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
264264
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
265-
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
265+
* {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
266266
*
267267
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
268268
* directory

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
159159
} else {
160160
final Engine.IndexCommitRef phase1Snapshot;
161161
try {
162-
phase1Snapshot = shard.acquireIndexCommit(true, false);
162+
phase1Snapshot = shard.acquireSafeIndexCommit();
163163
} catch (final Exception e) {
164164
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
165165
}

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
176176
/**
177177
* Creates a snapshot of the shard based on the index commit point.
178178
* <p>
179-
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
179+
* The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
180180
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
181181
* <p>
182182
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
408408
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
409409
try {
410410
// we flush first to make sure we get the latest writes snapshotted
411-
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
411+
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
412412
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
413413
if (logger.isDebugEnabled()) {
414414
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2139,7 +2139,7 @@ public void testConcurrentWritesAndCommits() throws Exception {
21392139
boolean doneIndexing;
21402140
do {
21412141
doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
2142-
commits.add(engine.acquireIndexCommit(false, true));
2142+
commits.add(engine.acquireLastIndexCommit(true));
21432143
if (commits.size() > commitLimit) { // don't keep on piling up too many commits
21442144
IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
21452145
// we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
@@ -4373,19 +4373,24 @@ public void testAcquireIndexCommit() throws Exception {
43734373
}
43744374
final boolean flushFirst = randomBoolean();
43754375
final boolean safeCommit = randomBoolean();
4376-
Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst);
4376+
final Engine.IndexCommitRef snapshot;
4377+
if (safeCommit) {
4378+
snapshot = engine.acquireSafeIndexCommit();
4379+
} else {
4380+
snapshot = engine.acquireLastIndexCommit(flushFirst);
4381+
}
43774382
int moreDocs = between(1, 20);
43784383
for (int i = 0; i < moreDocs; i++) {
43794384
index(engine, numDocs + i);
43804385
}
43814386
globalCheckpoint.set(numDocs + moreDocs - 1);
43824387
engine.flush();
43834388
// check that we can still read the commit that we captured
4384-
try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
4389+
try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
43854390
assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0));
43864391
}
43874392
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
4388-
commit.close();
4393+
snapshot.close();
43894394
// check it's clean up
43904395
engine.flush(true, true);
43914396
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
397397
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
398398
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
399399
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
400-
when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
400+
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
401401
doAnswer(invocation -> {
402402
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
403403
return null;

0 commit comments

Comments
 (0)