Skip to content

Commit eb36b82

Browse files
authored
Seq Number based recovery should validate last lucene commit max seq# (#22851)
The seq# base recovery logic relies on rolling back lucene to remove any operations above the global checkpoint. This part of the plan is not implemented yet but have to have these guarantees. Instead we should make the seq# logic validate that the last commit point (and the only one we have) maintains the invariant and if not, fall back to file based recovery. This commit adds a test that creates situation where rollback is needed (primary failover with ops in flight) and fixes another issue that was surfaced by it - if a primary can't serve a seq# based recovery request and does a file copy, it still used the incoming `startSeqNo` as a filter. Relates to #22484 & #10708
1 parent 29f63c7 commit eb36b82

File tree

14 files changed

+294
-119
lines changed

14 files changed

+294
-119
lines changed

core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ long getMaxSeqNo() {
139139
return nextSeqNo - 1;
140140
}
141141

142+
143+
/**
144+
* constructs a {@link SeqNoStats} object, using local state and the supplied global checkpoint
145+
*
146+
* @implNote this is needed to make sure the local checkpoint and max seq no are consistent
147+
*/
148+
synchronized SeqNoStats getStats(final long globalCheckpoint) {
149+
return new SeqNoStats(getMaxSeqNo(), getCheckpoint(), globalCheckpoint);
150+
}
151+
142152
/**
143153
* Waits for all operations up to the provided sequence number to complete.
144154
*

core/src/main/java/org/elasticsearch/index/seqno/SeqNoStats.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public class SeqNoStats implements ToXContent, Writeable {
3939
private final long globalCheckpoint;
4040

4141
public SeqNoStats(long maxSeqNo, long localCheckpoint, long globalCheckpoint) {
42+
assert localCheckpoint <= maxSeqNo:
43+
"local checkpoint [" + localCheckpoint + "] is above maximum seq no [" + maxSeqNo + "]";
44+
// note that the the global checkpoint can be higher from both maxSeqNo and localCheckpoint
45+
// as we use this stats object to describe lucene commits as well as live statistic.
4246
this.maxSeqNo = maxSeqNo;
4347
this.localCheckpoint = localCheckpoint;
4448
this.globalCheckpoint = globalCheckpoint;

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void markSeqNoAsCompleted(final long seqNo) {
111111
* @return stats encapuslating the maximum sequence number, the local checkpoint and the global checkpoint
112112
*/
113113
public SeqNoStats stats() {
114-
return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint());
114+
return localCheckpointTracker.getStats(getGlobalCheckpoint());
115115
}
116116

117117
/**

core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,10 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
199199
Supplier<Long> currentClusterStateVersionSupplier = () -> clusterService.state().getVersion();
200200
if (shard.indexSettings().isOnSharedFilesystem()) {
201201
handler = new SharedFSRecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
202-
this::delayNewRecoveries, logger);
202+
this::delayNewRecoveries, settings);
203203
} else {
204204
handler = new RecoverySourceHandler(shard, recoveryTarget, request, currentClusterStateVersionSupplier,
205-
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), logger);
205+
this::delayNewRecoveries, recoverySettings.getChunkSize().bytesAsInt(), settings);
206206
}
207207
return handler;
208208
}

core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.IndexNotFoundException;
4242
import org.elasticsearch.index.engine.RecoveryEngineException;
4343
import org.elasticsearch.index.mapper.MapperException;
44+
import org.elasticsearch.index.seqno.SeqNoStats;
4445
import org.elasticsearch.index.seqno.SequenceNumbersService;
4546
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
4647
import org.elasticsearch.index.shard.IndexEventListener;
@@ -61,7 +62,6 @@
6162
import org.elasticsearch.transport.TransportService;
6263

6364
import java.io.IOException;
64-
import java.util.Optional;
6565
import java.util.concurrent.atomic.AtomicLong;
6666
import java.util.concurrent.atomic.AtomicReference;
6767

@@ -365,7 +365,15 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
365365
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
366366
try {
367367
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.indexShard().shardPath().resolveTranslog());
368-
return recoveryTarget.store().loadSeqNoStats(globalCheckpoint).getLocalCheckpoint() + 1;
368+
final SeqNoStats seqNoStats = recoveryTarget.store().loadSeqNoStats(globalCheckpoint);
369+
if (seqNoStats.getMaxSeqNo() <= seqNoStats.getGlobalCheckpoint()) {
370+
// commit point is good for seq no based recovery as the maximum seq# including in it
371+
// is below the global checkpoint (i.e., it excludes any ops thay may not be on the primary)
372+
// Recovery will start at the first op after the local check point stored in the commit.
373+
return seqNoStats.getLocalCheckpoint() + 1;
374+
} else {
375+
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
376+
}
369377
} catch (final IOException e) {
370378
// this can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
371379
// translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and

0 commit comments

Comments
 (0)