Skip to content

Commit 950cede

Browse files
committed
Restore local checkpoint tracker on promotion
When a shard is promoted to replica, it's possible that it was previously a replica that started following a new primary. When it started following this new primary, the state of its local checkpoint tracker was reset. Upon promotion, it's possible that the state of the local checkpoint tracker has not yet restored from a successful primary-replica re-sync. To account for this, we must restore the state of the local checkpoint tracker when a replica shard is promoted to primary. To do this, we stream the operations in the translog, marking the operations that are in the translog as completed. We do this before we fill the gaps on the newly promoted primary, ensuring that we have a primary shard with a complete history up to the largest maximum sequence number it has ever seen.
1 parent 7dcd81b commit 950cede

File tree

5 files changed

+141
-1
lines changed

5 files changed

+141
-1
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,15 @@ public interface Warmer {
14451445
*/
14461446
public abstract void deactivateThrottling();
14471447

1448+
/**
1449+
* Marks operations in the translog from the specified local checkpoint as completed. This is used to restore the state of the local
1450+
* checkpoint tracker on primary promotion.
1451+
*
1452+
* @param localCheckpoint the local checkpoint
1453+
* @throws IOException if an I/O exception occurred reading the translog
1454+
*/
1455+
public abstract void restoreLocalCheckpointTracker(long localCheckpoint) throws IOException;
1456+
14481457
/**
14491458
* Fills up the local checkpoints history with no-ops until the local checkpoint
14501459
* and the max seen sequence ID are identical.

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,22 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
232232
logger.trace("created new InternalEngine");
233233
}
234234

235+
@Override
236+
public void restoreLocalCheckpointTracker(final long localCheckpoint) throws IOException {
237+
try (ReleasableLock ignored = writeLock.acquire()) {
238+
ensureOpen();
239+
try (Translog.View view = getTranslog().newView()) {
240+
final Translog.Snapshot snapshot = view.snapshot(localCheckpoint + 1);
241+
Translog.Operation operation;
242+
while ((operation = snapshot.next()) != null) {
243+
if (operation.seqNo() > localCheckpoint) {
244+
seqNoService().markSeqNoAsCompleted(operation.seqNo());
245+
}
246+
}
247+
}
248+
}
249+
}
250+
235251
@Override
236252
public int fillSeqNoGaps(long primaryTerm) throws IOException {
237253
try (ReleasableLock ignored = writeLock.acquire()) {

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ private void updatePrimaryTerm(
467467
() -> {
468468
latch.await();
469469
try {
470+
getEngine().restoreLocalCheckpointTracker(getLocalCheckpoint());
470471
getEngine().fillSeqNoGaps(newPrimaryTerm);
471472
primaryReplicaSyncer.accept(IndexShard.this, new ActionListener<ResyncTask>() {
472473
@Override
@@ -1660,6 +1661,15 @@ public long getLocalCheckpoint() {
16601661
return getEngine().seqNoService().getLocalCheckpoint();
16611662
}
16621663

1664+
/**
1665+
* Returns the maximum sequence number for the shard.
1666+
*
1667+
* @return the maximum sequence number
1668+
*/
1669+
public long getMaxSeqNo() {
1670+
return getEngine().seqNoService().getMaxSeqNo();
1671+
}
1672+
16631673
/**
16641674
* Returns the global checkpoint for the shard.
16651675
*

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3987,6 +3987,59 @@ private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws En
39873987
}
39883988
}
39893989

3990+
public void testRestoreLocalCheckpointTracker() throws IOException {
3991+
engine.close();
3992+
InternalEngine actualEngine = null;
3993+
try {
3994+
final Set<Long> completedSeqNos = new HashSet<>();
3995+
final SequenceNumbersService seqNoService =
3996+
new SequenceNumbersService(
3997+
shardId,
3998+
defaultSettings,
3999+
SequenceNumbersService.NO_OPS_PERFORMED,
4000+
SequenceNumbersService.NO_OPS_PERFORMED,
4001+
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
4002+
@Override
4003+
public void markSeqNoAsCompleted(long seqNo) {
4004+
super.markSeqNoAsCompleted(seqNo);
4005+
completedSeqNos.add(seqNo);
4006+
}
4007+
};
4008+
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
4009+
@Override
4010+
public SequenceNumbersService seqNoService() {
4011+
return seqNoService;
4012+
}
4013+
};
4014+
final int operations = randomIntBetween(0, 1024);
4015+
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
4016+
for (int i = 0; i < operations; i++) {
4017+
if (rarely() && i < operations - 1) {
4018+
continue;
4019+
}
4020+
final String id = Integer.toString(i);
4021+
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
4022+
final Term uid = newUid(doc);
4023+
actualEngine.index(
4024+
new Engine.Index(uid, doc, i, 1, 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), System.nanoTime(), false));
4025+
expectedCompletedSeqNos.add((long) i);
4026+
}
4027+
final long currentLocalCheckpoint = actualEngine.seqNoService().getLocalCheckpoint();
4028+
final long resetLocalCheckpoint =
4029+
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
4030+
actualEngine.seqNoService().resetLocalCheckpoint(resetLocalCheckpoint);
4031+
completedSeqNos.clear();
4032+
actualEngine.restoreLocalCheckpointTracker(actualEngine.seqNoService().getLocalCheckpoint());
4033+
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
4034+
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
4035+
assertThat(completedSeqNos, equalTo(intersection));
4036+
assertThat(actualEngine.seqNoService().getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
4037+
assertThat(actualEngine.seqNoService().generateSeqNo(), equalTo((long) operations));
4038+
} finally {
4039+
IOUtils.close(actualEngine);
4040+
}
4041+
}
4042+
39904043
public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
39914044
final int docs = randomIntBetween(1, 32);
39924045
int numDocsOnReplica = 0;

core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import org.elasticsearch.index.mapper.ParsedDocument;
8181
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
8282
import org.elasticsearch.index.mapper.SourceToParse;
83-
import org.elasticsearch.index.seqno.SequenceNumbers;
8483
import org.elasticsearch.index.seqno.SequenceNumbersService;
8584
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
8685
import org.elasticsearch.index.store.Store;
@@ -699,6 +698,59 @@ private void finish() {
699698
closeShards(indexShard);
700699
}
701700

701+
public void testRestoreLocalCheckpointTrackerStateOnPromotion() throws IOException, InterruptedException {
702+
final IndexShard indexShard = newStartedShard(false);
703+
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
704+
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
705+
706+
final long maxSeqNo = indexShard.getMaxSeqNo();
707+
final long globalCheckpointOnReplica = SequenceNumbersService.UNASSIGNED_SEQ_NO;
708+
randomIntBetween(
709+
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
710+
Math.toIntExact(indexShard.getLocalCheckpoint()));
711+
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
712+
713+
final int globalCheckpoint =
714+
randomIntBetween(
715+
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
716+
Math.toIntExact(indexShard.getLocalCheckpoint()));
717+
718+
final CountDownLatch latch = new CountDownLatch(1);
719+
indexShard.acquireReplicaOperationPermit(
720+
indexShard.getPrimaryTerm() + 1,
721+
globalCheckpoint,
722+
new ActionListener<Releasable>() {
723+
@Override
724+
public void onResponse(Releasable releasable) {
725+
releasable.close();
726+
latch.countDown();
727+
}
728+
729+
@Override
730+
public void onFailure(Exception e) {
731+
732+
}
733+
},
734+
ThreadPool.Names.SAME);
735+
736+
latch.await();
737+
738+
final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary();
739+
final CountDownLatch resyncLatch = new CountDownLatch(1);
740+
indexShard.updateShardState(
741+
newRouting,
742+
indexShard.getPrimaryTerm() + 1,
743+
(s, r) -> resyncLatch.countDown(),
744+
1L,
745+
Collections.singleton(newRouting.allocationId().getId()),
746+
Collections.emptySet());
747+
resyncLatch.await();
748+
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
749+
assertThat(indexShard.getMaxSeqNo(), equalTo(maxSeqNo));
750+
751+
closeShards(indexShard);
752+
}
753+
702754
public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
703755
final IndexShard indexShard = newStartedShard(false);
704756

0 commit comments

Comments
 (0)