Skip to content

Commit bdb0fb2

Browse files
authored
Fully encapsulate LocalCheckpointTracker inside of the engine (#31213)
* Fully encapsulate LocalCheckpointTracker inside of the engine This makes the Engine interface not expose the `LocalCheckpointTracker`, instead exposing the pieces needed (like retrieving the local checkpoint) as individual methods.
1 parent cdb486a commit bdb0fb2

File tree

10 files changed

+129
-81
lines changed

10 files changed

+129
-81
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import org.elasticsearch.index.mapper.ParseContext.Document;
6363
import org.elasticsearch.index.mapper.ParsedDocument;
6464
import org.elasticsearch.index.merge.MergeStats;
65-
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
65+
import org.elasticsearch.index.seqno.SeqNoStats;
6666
import org.elasticsearch.index.seqno.SequenceNumbers;
6767
import org.elasticsearch.index.shard.ShardId;
6868
import org.elasticsearch.index.store.Store;
@@ -635,11 +635,28 @@ public CommitStats commitStats() {
635635
}
636636

637637
/**
638-
* The sequence number service for this engine.
638+
* @return the local checkpoint for this Engine
639+
*/
640+
public abstract long getLocalCheckpoint();
641+
642+
/**
643+
* Waits for all operations up to the provided sequence number to complete.
639644
*
640-
* @return the sequence number service
645+
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
646+
* @throws InterruptedException if the thread was interrupted while blocking on the condition
647+
*/
648+
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
649+
650+
/**
651+
* Reset the local checkpoint in the tracker to the given local checkpoint
652+
* @param localCheckpoint the new checkpoint to be set
653+
*/
654+
public abstract void resetLocalCheckpoint(long localCheckpoint);
655+
656+
/**
657+
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
641658
*/
642-
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
659+
public abstract SeqNoStats getSeqNoStats(long globalCheckpoint);
643660

644661
/**
645662
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.index.merge.MergeStats;
6666
import org.elasticsearch.index.merge.OnGoingMerge;
6767
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
68+
import org.elasticsearch.index.seqno.SeqNoStats;
6869
import org.elasticsearch.index.seqno.SequenceNumbers;
6970
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
7071
import org.elasticsearch.index.shard.ShardId;
@@ -2185,10 +2186,31 @@ public MergeStats getMergeStats() {
21852186
return mergeScheduler.stats();
21862187
}
21872188

2188-
public final LocalCheckpointTracker getLocalCheckpointTracker() {
2189+
// Used only for testing! Package private to prevent anyone else from using it
2190+
LocalCheckpointTracker getLocalCheckpointTracker() {
21892191
return localCheckpointTracker;
21902192
}
21912193

2194+
@Override
2195+
public long getLocalCheckpoint() {
2196+
return localCheckpointTracker.getCheckpoint();
2197+
}
2198+
2199+
@Override
2200+
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
2201+
localCheckpointTracker.waitForOpsToComplete(seqNo);
2202+
}
2203+
2204+
@Override
2205+
public void resetLocalCheckpoint(long localCheckpoint) {
2206+
localCheckpointTracker.resetCheckpoint(localCheckpoint);
2207+
}
2208+
2209+
@Override
2210+
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
2211+
return localCheckpointTracker.getStats(globalCheckpoint);
2212+
}
2213+
21922214
/**
21932215
* Returns the number of times a version was looked up either from the index.
21942216
* Note this is only available if assertions are enabled

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ public void updateShardState(final ShardRouting newRouting,
405405
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
406406

407407
if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
408-
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
408+
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
409409
}
410410

411411
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
@@ -479,8 +479,7 @@ public void updateShardState(final ShardRouting newRouting,
479479
*/
480480
engine.rollTranslogGeneration();
481481
engine.fillSeqNoGaps(newPrimaryTerm);
482-
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
483-
getEngine().getLocalCheckpointTracker().getCheckpoint());
482+
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
484483
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
485484
@Override
486485
public void onResponse(ResyncTask resyncTask) {
@@ -506,7 +505,7 @@ public void onFailure(Exception e) {
506505
}
507506
},
508507
e -> failShard("exception during primary term transition", e));
509-
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
508+
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
510509
primaryTerm = newPrimaryTerm;
511510
}
512511
}
@@ -873,7 +872,7 @@ public CommitStats commitStats() {
873872
@Nullable
874873
public SeqNoStats seqNoStats() {
875874
Engine engine = getEngineOrNull();
876-
return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
875+
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
877876
}
878877

879878
public IndexingStats indexingStats(String... types) {
@@ -1707,7 +1706,7 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
17071706
* @throws InterruptedException if the thread was interrupted while blocking on the condition
17081707
*/
17091708
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
1710-
getEngine().getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
1709+
getEngine().waitForOpsToComplete(seqNo);
17111710
}
17121711

17131712
/**
@@ -1740,7 +1739,7 @@ public void markAllocationIdAsInSync(final String allocationId, final long local
17401739
* @return the local checkpoint
17411740
*/
17421741
public long getLocalCheckpoint() {
1743-
return getEngine().getLocalCheckpointTracker().getCheckpoint();
1742+
return getEngine().getLocalCheckpoint();
17441743
}
17451744

17461745
/**
@@ -1781,7 +1780,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
17811780
return;
17821781
}
17831782
// only sync if there are not operations in flight
1784-
final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
1783+
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
17851784
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
17861785
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
17871786
final String allocationId = routingEntry().allocationId().getId();
@@ -1818,7 +1817,7 @@ public ReplicationGroup getReplicationGroup() {
18181817
*/
18191818
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
18201819
verifyReplicationTarget();
1821-
final long localCheckpoint = getEngine().getLocalCheckpointTracker().getCheckpoint();
1820+
final long localCheckpoint = getLocalCheckpoint();
18221821
if (globalCheckpoint > localCheckpoint) {
18231822
/*
18241823
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@@ -1847,8 +1846,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
18471846
verifyPrimary();
18481847
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
18491848
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
1850-
getEngine().getLocalCheckpointTracker().getCheckpoint() ==
1851-
primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
1849+
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
18521850
synchronized (mutex) {
18531851
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
18541852
}
@@ -2234,7 +2232,7 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
22342232
operationPrimaryTerm,
22352233
getLocalCheckpoint(),
22362234
localCheckpoint);
2237-
getEngine().getLocalCheckpointTracker().resetCheckpoint(localCheckpoint);
2235+
getEngine().resetLocalCheckpoint(localCheckpoint);
22382236
getEngine().rollTranslogGeneration();
22392237
});
22402238
globalCheckpointUpdated = true;

server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ Index getIndex() {
6262
}
6363

6464
long maxSeqNo() {
65-
return shard.getEngine().getLocalCheckpointTracker().getMaxSeqNo();
65+
return shard.getEngine().getSeqNoStats(-1).getMaxSeqNo();
6666
}
6767

6868
long maxUnsafeAutoIdTimestamp() {

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.settings.Settings;
3333
import org.elasticsearch.common.util.set.Sets;
3434
import org.elasticsearch.gateway.GatewayAllocator;
35+
import org.elasticsearch.index.engine.EngineTestCase;
3536
import org.elasticsearch.index.shard.IndexShard;
3637
import org.elasticsearch.index.shard.IndexShardTestCase;
3738
import org.elasticsearch.index.shard.ShardId;
@@ -350,7 +351,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
350351
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
351352
}
352353
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
353-
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
354+
EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(oldPrimaryShard)); // Make gap in seqno.
354355
long moreDocs = scaledRandomIntBetween(1, 10);
355356
for (int i = 0; i < moreDocs; i++) {
356357
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));

0 commit comments

Comments
 (0)