Skip to content

Commit 930282e

Browse files
authored
Introduce sequence-number-based recovery
This commit introduces sequence-number-based recovery. When a replica has fallen out of sync, rather than performing a file-based recovery we first attempt to replay operations since the last local checkpoint on the replica. To do this, at the start of recovery the replica tells the primary what its local checkpoint is. The primary will then wait for all operations between that local checkpoint and the current maximum sequence number to complete; this is to ensure that there are no gaps in the operations that will be replayed from the primary to the replica. This is a best-effort attempt as we currently have no guarantees on the primary that these operations will be available; if we are not able to replay all operations in the desired range, we just fallback to file-based recovery. Later work will strengthen the guarantees. Relates #22484
1 parent 417c93c commit 930282e

33 files changed

+1264
-550
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,6 @@
408408
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoverySettings.java" checks="LineLength" />
409409
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]PeerRecoverySourceService.java" checks="LineLength" />
410410
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryState.java" checks="LineLength" />
411-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]StartRecoveryRequest.java" checks="LineLength" />
412411
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]IndicesStore.java" checks="LineLength" />
413412
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]store[/\\]TransportNodesListShardStoreMetaData.java" checks="LineLength" />
414413
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]ttl[/\\]IndicesTTLService.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.elasticsearch.index.fielddata.IndexFieldDataService;
3737
import org.elasticsearch.index.mapper.FieldMapper;
3838
import org.elasticsearch.index.mapper.MapperService;
39-
import org.elasticsearch.index.seqno.LocalCheckpointService;
39+
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
4040
import org.elasticsearch.index.similarity.SimilarityService;
4141
import org.elasticsearch.index.store.FsDirectoryService;
4242
import org.elasticsearch.index.store.Store;
@@ -115,7 +115,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
115115
IndexSettings.ALLOW_UNMAPPED,
116116
IndexSettings.INDEX_CHECK_ON_STARTUP,
117117
IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL,
118-
LocalCheckpointService.SETTINGS_BIT_ARRAYS_SIZE,
118+
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
119119
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
120120
IndexSettings.MAX_SLICES_PER_SCROLL,
121121
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,

core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRefCounted.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.elasticsearch.common.util.concurrent;
2021

2122
import org.apache.lucene.store.AlreadyClosedException;

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ void setTook(long took) {
379379
void freeze() {
380380
freeze.set(true);
381381
}
382+
382383
}
383384

384385
public static class IndexResult extends Result {

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.index.merge.MergeStats;
6464
import org.elasticsearch.index.merge.OnGoingMerge;
6565
import org.elasticsearch.index.seqno.SeqNoStats;
66+
import org.elasticsearch.index.seqno.SequenceNumbers;
6667
import org.elasticsearch.index.seqno.SequenceNumbersService;
6768
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
6869
import org.elasticsearch.index.shard.ShardId;
@@ -119,8 +120,6 @@ public class InternalEngine extends Engine {
119120
private final IndexThrottle throttle;
120121

121122
private final SequenceNumbersService seqNoService;
122-
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
123-
static final String MAX_SEQ_NO = "max_seq_no";
124123

125124
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
126125
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -159,11 +158,12 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
159158
switch (openMode) {
160159
case OPEN_INDEX_AND_TRANSLOG:
161160
writer = createWriter(false);
162-
seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer);
161+
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
162+
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
163163
break;
164164
case OPEN_INDEX_CREATE_TRANSLOG:
165165
writer = createWriter(false);
166-
seqNoStats = loadSeqNoStatsFromLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer);
166+
seqNoStats = store.loadSeqNoStats(SequenceNumbersService.UNASSIGNED_SEQ_NO);
167167
break;
168168
case CREATE_INDEX_AND_TRANSLOG:
169169
writer = createWriter(true);
@@ -353,47 +353,6 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer)
353353
return null;
354354
}
355355

356-
/**
357-
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog
358-
* checkpoint (global checkpoint).
359-
*
360-
* @param translogConfig the translog config (for the global checkpoint)
361-
* @param indexWriter the index writer (for the Lucene commit point)
362-
* @return the sequence number stats
363-
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
364-
*/
365-
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
366-
final TranslogConfig translogConfig,
367-
final IndexWriter indexWriter) throws IOException {
368-
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
369-
return loadSeqNoStatsFromLucene(globalCheckpoint, indexWriter);
370-
}
371-
372-
/**
373-
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the
374-
* specified global checkpoint.
375-
*
376-
* @param globalCheckpoint the global checkpoint to use
377-
* @param indexWriter the index writer (for the Lucene commit point)
378-
* @return the sequence number stats
379-
*/
380-
private static SeqNoStats loadSeqNoStatsFromLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
381-
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
382-
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
383-
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
384-
final String key = entry.getKey();
385-
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
386-
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
387-
localCheckpoint = Long.parseLong(entry.getValue());
388-
} else if (key.equals(MAX_SEQ_NO)) {
389-
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
390-
maxSeqNo = Long.parseLong(entry.getValue());
391-
}
392-
}
393-
394-
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
395-
}
396-
397356
private SearcherManager createSearcherManager() throws EngineException {
398357
boolean success = false;
399358
SearcherManager searcherManager = null;
@@ -793,7 +752,6 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
793752
if (delete.origin() == Operation.Origin.PRIMARY) {
794753
seqNo = seqNoService().generateSeqNo();
795754
}
796-
797755
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
798756
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
799757
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
@@ -1532,11 +1490,11 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
15321490
final Map<String, String> commitData = new HashMap<>(6);
15331491
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
15341492
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
1535-
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
1493+
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
15361494
if (syncId != null) {
15371495
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
15381496
}
1539-
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
1497+
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
15401498
if (logger.isTraceEnabled()) {
15411499
logger.trace("committing writer with commit data [{}]", commitData);
15421500
}

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointService.java renamed to core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,27 @@
2222
import com.carrotsearch.hppc.ObjectLongHashMap;
2323
import com.carrotsearch.hppc.ObjectLongMap;
2424
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
25+
import org.apache.logging.log4j.Logger;
2526
import org.elasticsearch.index.IndexSettings;
2627
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
2728
import org.elasticsearch.index.shard.ShardId;
2829

2930
import java.util.HashSet;
31+
import java.util.Locale;
3032
import java.util.Set;
3133

3234
import static org.elasticsearch.index.seqno.SequenceNumbersService.UNASSIGNED_SEQ_NO;
3335

3436
/**
35-
* A shard component that is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which
36-
* all lower (or equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the
37-
* master starts them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery.
38-
* These shards have received all old operations via the recovery mechanism and are kept up to date by the various replications actions.
39-
* The set of shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
37+
* This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or
38+
* equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts
39+
* them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards
40+
* have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of
41+
* shards that are taken into account for the global checkpoint calculation are called the "in-sync shards".
4042
* <p>
4143
* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}).
4244
*/
43-
public class GlobalCheckpointService extends AbstractIndexShardComponent {
45+
public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
4446

4547
/*
4648
* This map holds the last known local checkpoint for every active shard and initializing shard copies that has been brought up to speed
@@ -63,14 +65,14 @@ public class GlobalCheckpointService extends AbstractIndexShardComponent {
6365
private long globalCheckpoint;
6466

6567
/**
66-
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint for this
67-
* shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
68+
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
69+
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
6870
*
69-
* @param shardId the shard this service is tracking local checkpoints for
71+
* @param shardId the shard ID
7072
* @param indexSettings the index settings
7173
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
7274
*/
73-
GlobalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
75+
GlobalCheckpointTracker(final ShardId shardId, final IndexSettings indexSettings, final long globalCheckpoint) {
7476
super(shardId, indexSettings);
7577
assert globalCheckpoint >= UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
7678
inSyncLocalCheckpoints = new ObjectLongHashMap<>(1 + indexSettings.getNumberOfReplicas());
@@ -127,8 +129,9 @@ synchronized boolean updateCheckpointOnPrimary() {
127129
minCheckpoint = Math.min(cp.value, minCheckpoint);
128130
}
129131
if (minCheckpoint < globalCheckpoint) {
130-
throw new IllegalStateException(shardId + " new global checkpoint [" + minCheckpoint
131-
+ "] is lower than previous one [" + globalCheckpoint + "]");
132+
final String message =
133+
String.format(Locale.ROOT, "new global checkpoint [%d] is lower than previous one [%d]", minCheckpoint, globalCheckpoint);
134+
throw new IllegalStateException(message);
132135
}
133136
if (globalCheckpoint != minCheckpoint) {
134137
logger.trace("global checkpoint updated to [{}]", minCheckpoint);

core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java renamed to core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,17 @@
2020
package org.elasticsearch.index.seqno;
2121

2222
import org.apache.lucene.util.FixedBitSet;
23+
import org.elasticsearch.common.SuppressForbidden;
2324
import org.elasticsearch.common.settings.Setting;
2425
import org.elasticsearch.index.IndexSettings;
25-
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
26-
import org.elasticsearch.index.shard.ShardId;
2726

2827
import java.util.LinkedList;
2928

3029
/**
31-
* This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which all previous
32-
* sequence numbers have been processed (inclusive).
30+
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
31+
* previous sequence numbers have been processed (inclusive).
3332
*/
34-
public class LocalCheckpointService extends AbstractIndexShardComponent {
33+
public class LocalCheckpointTracker {
3534

3635
/**
3736
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
@@ -67,17 +66,15 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
6766
private volatile long nextSeqNo;
6867

6968
/**
70-
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
71-
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint for this
72-
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
69+
* Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned, or
70+
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
71+
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
7372
*
74-
* @param shardId the shard this service is providing tracking local checkpoints for
7573
* @param indexSettings the index settings
76-
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
77-
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
74+
* @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
75+
* @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
7876
*/
79-
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
80-
super(shardId, indexSettings);
77+
public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
8178
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbersService.NO_OPS_PERFORMED) {
8279
throw new IllegalArgumentException(
8380
"local checkpoint must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] "
@@ -107,7 +104,7 @@ synchronized long generateSeqNo() {
107104
*
108105
* @param seqNo the sequence number to mark as completed
109106
*/
110-
synchronized void markSeqNoAsCompleted(final long seqNo) {
107+
public synchronized void markSeqNoAsCompleted(final long seqNo) {
111108
// make sure we track highest seen sequence number
112109
if (seqNo >= nextSeqNo) {
113110
nextSeqNo = seqNo + 1;
@@ -142,10 +139,25 @@ long getMaxSeqNo() {
142139
return nextSeqNo - 1;
143140
}
144141

142+
/**
143+
* Waits for all operations up to the provided sequence number to complete.
144+
*
145+
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
146+
* @throws InterruptedException if the thread was interrupted while blocking on the condition
147+
*/
148+
@SuppressForbidden(reason = "Object#wait")
149+
synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedException {
150+
while (checkpoint < seqNo) {
151+
// notified by updateCheckpoint
152+
this.wait();
153+
}
154+
}
155+
145156
/**
146157
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
147158
* current checkpoint is processed.
148159
*/
160+
@SuppressForbidden(reason = "Object#notifyAll")
149161
private void updateCheckpoint() {
150162
assert Thread.holdsLock(this);
151163
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 :
@@ -154,19 +166,24 @@ assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
154166
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
155167
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
156168
"updateCheckpoint is called but the bit following the checkpoint is not set";
157-
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
158-
FixedBitSet current = processedSeqNo.getFirst();
159-
do {
160-
checkpoint++;
161-
// the checkpoint always falls in the first bit set or just before. If it falls
162-
// on the last bit of the current bit set, we can clean it.
163-
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
164-
processedSeqNo.removeFirst();
165-
firstProcessedSeqNo += bitArraysSize;
166-
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
167-
current = processedSeqNo.peekFirst();
168-
}
169-
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
169+
try {
170+
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
171+
FixedBitSet current = processedSeqNo.getFirst();
172+
do {
173+
checkpoint++;
174+
// the checkpoint always falls in the first bit set or just before. If it falls
175+
// on the last bit of the current bit set, we can clean it.
176+
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) {
177+
processedSeqNo.removeFirst();
178+
firstProcessedSeqNo += bitArraysSize;
179+
assert checkpoint - firstProcessedSeqNo < bitArraysSize;
180+
current = processedSeqNo.peekFirst();
181+
}
182+
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
183+
} finally {
184+
// notifies waiters in waitForOpsToComplete
185+
this.notifyAll();
186+
}
170187
}
171188

172189
/**

core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
public class SeqNoStats implements ToXContent, Writeable {
3131

3232
private static final String SEQ_NO = "seq_no";
33-
private static final String MAX_SEQ_NO = "max";
33+
private static final String MAX_SEQ_NO = "max_seq_no";
3434
private static final String LOCAL_CHECKPOINT = "local_checkpoint";
3535
private static final String GLOBAL_CHECKPOINT = "global_checkpoint";
3636

0 commit comments

Comments
 (0)