Skip to content

Commit 93311ab

Browse files
authored
Restore local checkpoint tracker on promotion
When a shard is promoted to replica, it's possible that it was previously a replica that started following a new primary. When it started following this new primary, the state of its local checkpoint tracker was reset. Upon promotion, it's possible that the state of the local checkpoint tracker has not yet restored from a successful primary-replica re-sync. To account for this, we must restore the state of the local checkpoint tracker when a replica shard is promoted to primary. To do this, we stream the operations in the translog, marking the operations that are in the translog as completed. We do this before we fill the gaps on the newly promoted primary, ensuring that we have a primary shard with a complete history up to the largest maximum sequence number it has ever seen. Relates #25553
1 parent 2eafbaf commit 93311ab

File tree

5 files changed

+151
-1
lines changed

5 files changed

+151
-1
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,14 @@ public interface Warmer {
14451445
*/
14461446
public abstract void deactivateThrottling();
14471447

1448+
/**
1449+
* Marks operations in the translog as completed. This is used to restore the state of the local checkpoint tracker on primary
1450+
* promotion.
1451+
*
1452+
* @throws IOException if an I/O exception occurred reading the translog
1453+
*/
1454+
public abstract void restoreLocalCheckpointFromTranslog() throws IOException;
1455+
14481456
/**
14491457
* Fills up the local checkpoints history with no-ops until the local checkpoint
14501458
* and the max seen sequence ID are identical.

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
232232
logger.trace("created new InternalEngine");
233233
}
234234

235+
@Override
236+
public void restoreLocalCheckpointFromTranslog() throws IOException {
237+
try (ReleasableLock ignored = writeLock.acquire()) {
238+
ensureOpen();
239+
final long localCheckpoint = seqNoService().getLocalCheckpoint();
240+
try (Translog.View view = getTranslog().newView()) {
241+
final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1);
242+
Translog.Operation operation;
243+
while ((operation = snapshot.next()) != null) {
244+
if (operation.seqNo() > localCheckpoint) {
245+
seqNoService().markSeqNoAsCompleted(operation.seqNo());
246+
}
247+
}
248+
}
249+
}
250+
}
251+
235252
@Override
236253
public int fillSeqNoGaps(long primaryTerm) throws IOException {
237254
try (ReleasableLock ignored = writeLock.acquire()) {

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,17 @@ public void updateShardState(final ShardRouting newRouting,
460460
() -> {
461461
latch.await();
462462
try {
463+
/*
464+
* If this shard was serving as a replica shard when another shard was promoted to primary then the state of
465+
* its local checkpoint tracker was reset during the primary term transition. In particular, the local
466+
* checkpoint on this shard was thrown back to the global checkpoint and the state of the local checkpoint
467+
* tracker above the local checkpoint was destroyed. If the other shard that was promoted to primary
468+
* subsequently fails before the primary/replica re-sync completes successfully and we are now being
469+
* promoted, the local checkpoint tracker here could be left in a state where it would re-issue sequence
470+
* numbers. To ensure that this is not the case, we restore the state of the local checkpoint tracker by
471+
* replaying the translog and marking any operations there are completed.
472+
*/
473+
getEngine().restoreLocalCheckpointFromTranslog();
463474
getEngine().fillSeqNoGaps(newPrimaryTerm);
464475
updateLocalCheckpointForShard(currentRouting.allocationId().getId(),
465476
getEngine().seqNoService().getLocalCheckpoint());

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3995,6 +3995,67 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
39953995
}
39963996
}
39973997

3998+
public void testRestoreLocalCheckpointFromTranslog() throws IOException {
3999+
engine.close();
4000+
InternalEngine actualEngine = null;
4001+
try {
4002+
final Set<Long> completedSeqNos = new HashSet<>();
4003+
final SequenceNumbersService seqNoService =
4004+
new SequenceNumbersService(
4005+
shardId,
4006+
defaultSettings,
4007+
SequenceNumbersService.NO_OPS_PERFORMED,
4008+
SequenceNumbersService.NO_OPS_PERFORMED,
4009+
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
4010+
@Override
4011+
public void markSeqNoAsCompleted(long seqNo) {
4012+
super.markSeqNoAsCompleted(seqNo);
4013+
completedSeqNos.add(seqNo);
4014+
}
4015+
};
4016+
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
4017+
@Override
4018+
public SequenceNumbersService seqNoService() {
4019+
return seqNoService;
4020+
}
4021+
};
4022+
final int operations = randomIntBetween(0, 1024);
4023+
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
4024+
for (int i = 0; i < operations; i++) {
4025+
if (rarely() && i < operations - 1) {
4026+
continue;
4027+
}
4028+
expectedCompletedSeqNos.add((long) i);
4029+
}
4030+
4031+
final ArrayList<Long> seqNos = new ArrayList<>(expectedCompletedSeqNos);
4032+
Randomness.shuffle(seqNos);
4033+
for (final long seqNo : seqNos) {
4034+
final String id = Long.toString(seqNo);
4035+
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
4036+
final Term uid = newUid(doc);
4037+
final long time = System.nanoTime();
4038+
actualEngine.index(new Engine.Index(uid, doc, seqNo, 1, 1, VersionType.EXTERNAL, REPLICA, time, time, false));
4039+
if (rarely()) {
4040+
actualEngine.rollTranslogGeneration();
4041+
}
4042+
}
4043+
final long currentLocalCheckpoint = actualEngine.seqNoService().getLocalCheckpoint();
4044+
final long resetLocalCheckpoint =
4045+
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
4046+
actualEngine.seqNoService().resetLocalCheckpoint(resetLocalCheckpoint);
4047+
completedSeqNos.clear();
4048+
actualEngine.restoreLocalCheckpointFromTranslog();
4049+
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
4050+
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
4051+
assertThat(completedSeqNos, equalTo(intersection));
4052+
assertThat(actualEngine.seqNoService().getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
4053+
assertThat(actualEngine.seqNoService().generateSeqNo(), equalTo((long) operations));
4054+
} finally {
4055+
IOUtils.close(actualEngine);
4056+
}
4057+
}
4058+
39984059
public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
39994060
final int docs = randomIntBetween(1, 32);
40004061
int numDocsOnReplica = 0;

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
8282
import org.elasticsearch.index.mapper.SourceToParse;
8383
import org.elasticsearch.index.mapper.Uid;
84-
import org.elasticsearch.index.seqno.SequenceNumbers;
8584
import org.elasticsearch.index.seqno.SequenceNumbersService;
8685
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
8786
import org.elasticsearch.index.store.Store;
@@ -700,6 +699,60 @@ private void finish() {
700699
closeShards(indexShard);
701700
}
702701

702+
public void testRestoreLocalCheckpointTrackerFromTranslogOnPromotion() throws IOException, InterruptedException {
703+
final IndexShard indexShard = newStartedShard(false);
704+
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
705+
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
706+
707+
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
708+
final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO;
709+
randomIntBetween(
710+
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
711+
Math.toIntExact(indexShard.getLocalCheckpoint()));
712+
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
713+
714+
final int globalCheckpoint =
715+
randomIntBetween(
716+
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
717+
Math.toIntExact(indexShard.getLocalCheckpoint()));
718+
719+
final CountDownLatch latch = new CountDownLatch(1);
720+
indexShard.acquireReplicaOperationPermit(
721+
indexShard.getPrimaryTerm() + 1,
722+
globalCheckpoint,
723+
new ActionListener<Releasable>() {
724+
@Override
725+
public void onResponse(Releasable releasable) {
726+
releasable.close();
727+
latch.countDown();
728+
}
729+
730+
@Override
731+
public void onFailure(Exception e) {
732+
733+
}
734+
},
735+
ThreadPool.Names.SAME);
736+
737+
latch.await();
738+
739+
final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary();
740+
final CountDownLatch resyncLatch = new CountDownLatch(1);
741+
indexShard.updateShardState(
742+
newRouting,
743+
indexShard.getPrimaryTerm() + 1,
744+
(s, r) -> resyncLatch.countDown(),
745+
1L,
746+
Collections.singleton(newRouting.allocationId().getId()),
747+
Collections.emptySet(),
748+
Collections.emptySet());
749+
resyncLatch.await();
750+
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
751+
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
752+
753+
closeShards(indexShard);
754+
}
755+
703756
public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
704757
final IndexShard indexShard = newStartedShard(false);
705758

0 commit comments

Comments
 (0)