Skip to content

Commit cdc6085

Browse files
committed
Primary send safe commit in file-based recovery (#28038)
Today a primary shard transfers the most recent commit point to a replica shard in a file-based recovery. However, the most recent commit may not be a "safe" commit; this causes a replica shard not having a safe commit point until it can retain a safe commit by itself. This commits collapses the snapshot deletion policy into the combined deletion policy and modifies the peer recovery source to send a safe commit. Relates #10708
1 parent ba770b5 commit cdc6085

File tree

14 files changed

+256
-92
lines changed

14 files changed

+256
-92
lines changed

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919

2020
package org.elasticsearch.index.engine;
2121

22+
import com.carrotsearch.hppc.ObjectIntHashMap;
2223
import org.apache.lucene.index.IndexCommit;
2324
import org.apache.lucene.index.IndexDeletionPolicy;
25+
import org.apache.lucene.store.Directory;
2426
import org.elasticsearch.index.seqno.SequenceNumbers;
2527
import org.elasticsearch.index.translog.Translog;
2628
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
2729

2830
import java.io.IOException;
2931
import java.nio.file.Path;
32+
import java.util.Collection;
3033
import java.util.List;
3134
import java.util.Map;
3235
import java.util.function.LongSupplier;
@@ -42,12 +45,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
4245
private final TranslogDeletionPolicy translogDeletionPolicy;
4346
private final EngineConfig.OpenMode openMode;
4447
private final LongSupplier globalCheckpointSupplier;
48+
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
49+
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
50+
private IndexCommit lastCommit; // the most recent commit point
4551

4652
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
4753
LongSupplier globalCheckpointSupplier) {
4854
this.openMode = openMode;
4955
this.translogDeletionPolicy = translogDeletionPolicy;
5056
this.globalCheckpointSupplier = globalCheckpointSupplier;
57+
this.snapshottedCommits = new ObjectIntHashMap<>();
5158
}
5259

5360
@Override
@@ -70,18 +77,22 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
7077
}
7178

7279
@Override
73-
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
80+
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
7481
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
82+
lastCommit = commits.get(commits.size() - 1);
83+
safeCommit = commits.get(keptPosition);
7584
for (int i = 0; i < keptPosition; i++) {
76-
commits.get(i).delete();
85+
if (snapshottedCommits.containsKey(commits.get(i)) == false) {
86+
commits.get(i).delete();
87+
}
7788
}
78-
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
89+
updateTranslogDeletionPolicy();
7990
}
8091

81-
private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
82-
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
83-
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
84-
92+
private void updateTranslogDeletionPolicy() throws IOException {
93+
assert Thread.holdsLock(this);
94+
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
95+
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
8596
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
8697
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
8798

@@ -90,6 +101,34 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
90101
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
91102
}
92103

104+
/**
105+
* Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}.
106+
* Index files of the capturing commit point won't be released until the commit reference is closed.
107+
*
108+
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
109+
*/
110+
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
111+
assert safeCommit != null : "Safe commit is not initialized yet";
112+
assert lastCommit != null : "Last commit is not initialized yet";
113+
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
114+
snapshottedCommits.addTo(snapshotting, 1); // increase refCount
115+
return new SnapshotIndexCommit(snapshotting);
116+
}
117+
118+
/**
119+
* Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}.
120+
*/
121+
synchronized void releaseCommit(final IndexCommit snapshotCommit) {
122+
final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate;
123+
assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" +
124+
"snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]";
125+
final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount
126+
assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]";
127+
if (refCount == 0) {
128+
snapshottedCommits.remove(releasingCommit);
129+
}
130+
}
131+
93132
/**
94133
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
95134
* The max sequence number of a safe commit point should be at most the global checkpoint.
@@ -149,4 +188,60 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
149188
*/
150189
return 0;
151190
}
191+
192+
/**
193+
* A wrapper of an index commit that prevents it from being deleted.
194+
*/
195+
private static class SnapshotIndexCommit extends IndexCommit {
196+
private final IndexCommit delegate;
197+
198+
SnapshotIndexCommit(IndexCommit delegate) {
199+
this.delegate = delegate;
200+
}
201+
202+
@Override
203+
public String getSegmentsFileName() {
204+
return delegate.getSegmentsFileName();
205+
}
206+
207+
@Override
208+
public Collection<String> getFileNames() throws IOException {
209+
return delegate.getFileNames();
210+
}
211+
212+
@Override
213+
public Directory getDirectory() {
214+
return delegate.getDirectory();
215+
}
216+
217+
@Override
218+
public void delete() {
219+
throw new UnsupportedOperationException("A snapshot commit does not support deletion");
220+
}
221+
222+
@Override
223+
public boolean isDeleted() {
224+
return delegate.isDeleted();
225+
}
226+
227+
@Override
228+
public int getSegmentCount() {
229+
return delegate.getSegmentCount();
230+
}
231+
232+
@Override
233+
public long getGeneration() {
234+
return delegate.getGeneration();
235+
}
236+
237+
@Override
238+
public Map<String, String> getUserData() throws IOException {
239+
return delegate.getUserData();
240+
}
241+
242+
@Override
243+
public String toString() {
244+
return "SnapshotIndexCommit{" + delegate + "}";
245+
}
246+
}
152247
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.lucene.index.SegmentCommitInfo;
3333
import org.apache.lucene.index.SegmentInfos;
3434
import org.apache.lucene.index.SegmentReader;
35-
import org.apache.lucene.index.SnapshotDeletionPolicy;
3635
import org.apache.lucene.index.Term;
3736
import org.apache.lucene.search.IndexSearcher;
3837
import org.apache.lucene.search.ReferenceManager;
@@ -92,7 +91,6 @@
9291
import java.util.concurrent.locks.ReentrantLock;
9392
import java.util.concurrent.locks.ReentrantReadWriteLock;
9493
import java.util.function.BiFunction;
95-
import java.util.stream.Collectors;
9694

9795
public abstract class Engine implements Closeable {
9896

@@ -880,9 +878,10 @@ public void forceMerge(boolean flush) throws IOException {
880878
* Snapshots the index and returns a handle to it. If needed will try and "commit" the
881879
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
882880
*
881+
* @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
883882
* @param flushFirst indicates whether the engine should flush before returning the snapshot
884883
*/
885-
public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException;
884+
public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
886885

887886
/**
888887
* fail engine due to some error. the engine will also be closed.
@@ -1458,9 +1457,9 @@ public static class IndexCommitRef implements Closeable {
14581457
private final CheckedRunnable<IOException> onClose;
14591458
private final IndexCommit indexCommit;
14601459

1461-
IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException {
1462-
indexCommit = deletionPolicy.snapshot();
1463-
onClose = () -> deletionPolicy.release(indexCommit);
1460+
IndexCommitRef(IndexCommit indexCommit, CheckedRunnable<IOException> onClose) {
1461+
this.indexCommit = indexCommit;
1462+
this.onClose = onClose;
14641463
}
14651464

14661465
@Override

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public class InternalEngine extends Engine {
126126

127127
private final String uidField;
128128

129-
private final SnapshotDeletionPolicy snapshotDeletionPolicy;
129+
private final CombinedDeletionPolicy combinedDeletionPolicy;
130130

131131
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
132132
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -185,9 +185,8 @@ public InternalEngine(EngineConfig engineConfig) {
185185
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), startingCommit);
186186
assert translog.getGeneration() != null;
187187
this.translog = translog;
188-
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
189-
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
190-
);
188+
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
189+
translog::getLastSyncedGlobalCheckpoint);
191190
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
192191
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
193192
assert engineConfig.getForceNewHistoryUUID() == false
@@ -1699,20 +1698,16 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
16991698
}
17001699

17011700
@Override
1702-
public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException {
1701+
public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
17031702
// we have to flush outside of the readlock otherwise we might have a problem upgrading
17041703
// the to a write lock when we fail the engine in this operation
17051704
if (flushFirst) {
17061705
logger.trace("start flush for snapshot");
17071706
flush(false, true);
17081707
logger.trace("finish flush for snapshot");
17091708
}
1710-
try (ReleasableLock lock = readLock.acquire()) {
1711-
logger.trace("pulling snapshot");
1712-
return new IndexCommitRef(snapshotDeletionPolicy);
1713-
} catch (IOException e) {
1714-
throw new SnapshotFailedEngineException(shardId, e);
1715-
}
1709+
final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
1710+
return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
17161711
}
17171712

17181713
private boolean failOnTragicEvent(AlreadyClosedException ex) {
@@ -1883,7 +1878,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit start
18831878
iwc.setCommitOnClose(false); // we by default don't commit on close
18841879
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
18851880
iwc.setIndexCommit(startingCommit);
1886-
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
1881+
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
18871882
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
18881883
boolean verbose = false;
18891884
try {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,13 +1101,14 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
11011101
* Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
11021102
* commit won't be freed until the commit / snapshot is closed.
11031103
*
1104+
* @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
11041105
* @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
11051106
*/
1106-
public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException {
1107+
public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
11071108
IndexShardState state = this.state; // one time volatile read
11081109
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
11091110
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
1110-
return getEngine().acquireIndexCommit(flushFirst);
1111+
return getEngine().acquireIndexCommit(safeCommit, flushFirst);
11111112
} else {
11121113
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
11131114
}
@@ -1141,7 +1142,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
11411142
return store.getMetadata(null, true);
11421143
}
11431144
}
1144-
indexCommit = engine.acquireIndexCommit(false);
1145+
indexCommit = engine.acquireIndexCommit(false, false);
11451146
return store.getMetadata(indexCommit.getIndexCommit());
11461147
} finally {
11471148
store.decRef();

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
4848
store.incRef();
4949
boolean success = false;
5050
try {
51-
indexCommit = shard.acquireIndexCommit(true);
51+
indexCommit = shard.acquireIndexCommit(false, true);
5252
success = true;
5353
} finally {
5454
if (success == false) {

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ final void ensureOpen() {
246246
*
247247
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
248248
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
249-
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
249+
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
250250
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
251251
* directory
252252
* @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an
@@ -270,7 +270,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
270270
*
271271
* {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
272272
* {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
273-
* {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
273+
* {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
274274
*
275275
* @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
276276
* directory

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
159159
} else {
160160
final Engine.IndexCommitRef phase1Snapshot;
161161
try {
162-
phase1Snapshot = shard.acquireIndexCommit(false);
162+
phase1Snapshot = shard.acquireIndexCommit(true, false);
163163
} catch (final Exception e) {
164164
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
165165
}

server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina
415415
final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
416416
try {
417417
// we flush first to make sure we get the latest writes snapshotted
418-
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) {
418+
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
419419
repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
420420
if (logger.isDebugEnabled()) {
421421
StringBuilder details = new StringBuilder();

0 commit comments

Comments
 (0)