Skip to content

Commit 268df76

Browse files
committed
Verify primary mode usage with assertions (#32667)
Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES 5.0. Subsequent work introduced the replication tracker which lets the primary own its replication group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes whether it is operating in primary mode or replica mode, independent of the ShardRouting object that's associated with a shard. During a primary relocation, for example, the primary mode is transferred between the primary relocation source and the primary relocation target. After transferring this so-called primary context, the old primary becomes a replication target and the new primary the replication source, reflected in the replication tracker on both nodes. With the most recent PR in this area (#32442), we finally have a clean transition between a shard that's operating as a primary and issuing sequence numbers and a shard that's serving as a replication target. The transition from one state to the other is enforced through the operation-permit system, where we block permit acquisition during such changes and perform the transition under this operation block, ensuring that there are no operations in progress while the transition is being performed. This finally allows us to turn the best-effort checks that were put in place to prevent shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard assertions, making it easier to catch any bugs in this area.
1 parent f75e159 commit 268df76

File tree

2 files changed

+46
-42
lines changed

2 files changed

+46
-42
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,30 +1481,25 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
14811481
}
14821482
} else {
14831483
if (origin == Engine.Operation.Origin.PRIMARY) {
1484-
verifyPrimary();
1484+
assert assertPrimaryMode();
14851485
} else {
14861486
assert origin == Engine.Operation.Origin.REPLICA;
1487-
verifyReplicationTarget();
1487+
assert assertReplicationTarget();
14881488
}
14891489
if (writeAllowedStates.contains(state) == false) {
14901490
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
14911491
}
14921492
}
14931493
}
14941494

1495-
private void verifyPrimary() {
1496-
if (shardRouting.primary() == false) {
1497-
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
1498-
}
1495+
private boolean assertPrimaryMode() {
1496+
assert shardRouting.primary() && replicationTracker.isPrimaryMode() : "shard " + shardRouting + " is not a primary shard in primary mode";
1497+
return true;
14991498
}
15001499

1501-
private void verifyReplicationTarget() {
1502-
final IndexShardState state = state();
1503-
if (shardRouting.primary() && shardRouting.active() && replicationTracker.isPrimaryMode()) {
1504-
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
1505-
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
1506-
"relocation hand off, state is [" + state + "]");
1507-
}
1500+
private boolean assertReplicationTarget() {
1501+
assert replicationTracker.isPrimaryMode() == false : "shard " + shardRouting + " in primary mode cannot be a replication target";
1502+
return true;
15081503
}
15091504

15101505
private void verifyNotClosed() throws IllegalIndexShardStateException {
@@ -1751,7 +1746,7 @@ public void writeIndexingBuffer() {
17511746
* @param checkpoint the local checkpoint for the shard
17521747
*/
17531748
public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
1754-
verifyPrimary();
1749+
assert assertPrimaryMode();
17551750
verifyNotClosed();
17561751
replicationTracker.updateLocalCheckpoint(allocationId, checkpoint);
17571752
}
@@ -1763,7 +1758,7 @@ public void updateLocalCheckpointForShard(final String allocationId, final long
17631758
* @param globalCheckpoint the global checkpoint
17641759
*/
17651760
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
1766-
verifyPrimary();
1761+
assert assertPrimaryMode();
17671762
verifyNotClosed();
17681763
replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
17691764
}
@@ -1785,7 +1780,7 @@ public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
17851780
* @param allocationId the allocation ID of the shard for which recovery was initiated
17861781
*/
17871782
public void initiateTracking(final String allocationId) {
1788-
verifyPrimary();
1783+
assert assertPrimaryMode();
17891784
replicationTracker.initiateTracking(allocationId);
17901785
}
17911786

@@ -1798,7 +1793,7 @@ public void initiateTracking(final String allocationId) {
17981793
* @param localCheckpoint the current local checkpoint on the shard
17991794
*/
18001795
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
1801-
verifyPrimary();
1796+
assert assertPrimaryMode();
18021797
replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
18031798
}
18041799

@@ -1833,7 +1828,7 @@ public long getLastSyncedGlobalCheckpoint() {
18331828
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
18341829
*/
18351830
public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
1836-
verifyPrimary();
1831+
assert assertPrimaryMode();
18371832
verifyNotClosed();
18381833
return replicationTracker.getInSyncGlobalCheckpoints();
18391834
}
@@ -1843,11 +1838,12 @@ public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
18431838
* primary.
18441839
*/
18451840
public void maybeSyncGlobalCheckpoint(final String reason) {
1846-
verifyPrimary();
18471841
verifyNotClosed();
1842+
assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard";
18481843
if (replicationTracker.isPrimaryMode() == false) {
18491844
return;
18501845
}
1846+
assert assertPrimaryMode();
18511847
// only sync if there are not operations in flight
18521848
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
18531849
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
@@ -1873,7 +1869,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
18731869
* @return the replication group
18741870
*/
18751871
public ReplicationGroup getReplicationGroup() {
1876-
verifyPrimary();
1872+
assert assertPrimaryMode();
18771873
verifyNotClosed();
18781874
return replicationTracker.getReplicationGroup();
18791875
}
@@ -1885,7 +1881,7 @@ public ReplicationGroup getReplicationGroup() {
18851881
* @param reason the reason the global checkpoint was updated
18861882
*/
18871883
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
1888-
verifyReplicationTarget();
1884+
assert assertReplicationTarget();
18891885
final long localCheckpoint = getLocalCheckpoint();
18901886
if (globalCheckpoint > localCheckpoint) {
18911887
/*
@@ -1912,8 +1908,7 @@ assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.ST
19121908
* @param primaryContext the sequence number context
19131909
*/
19141910
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
1915-
verifyPrimary();
1916-
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
1911+
assert shardRouting.primary() && shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: " + shardRouting;
19171912
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
19181913
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
19191914
synchronized (mutex) {
@@ -1927,7 +1922,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
19271922
* @return {@code true} if there is at least one shard pending in-sync, otherwise false
19281923
*/
19291924
public boolean pendingInSync() {
1930-
verifyPrimary();
1925+
assert assertPrimaryMode();
19311926
return replicationTracker.pendingInSync();
19321927
}
19331928

@@ -2244,7 +2239,7 @@ private EngineConfig newEngineConfig() {
22442239
*/
22452240
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
22462241
verifyNotClosed();
2247-
verifyPrimary();
2242+
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
22482243

22492244
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
22502245
}
@@ -2294,7 +2289,6 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
22942289
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
22952290
final Object debugInfo) {
22962291
verifyNotClosed();
2297-
verifyReplicationTarget();
22982292
if (opPrimaryTerm > pendingPrimaryTerm) {
22992293
synchronized (mutex) {
23002294
if (opPrimaryTerm > pendingPrimaryTerm) {
@@ -2347,6 +2341,7 @@ public void onResponse(final Releasable releasable) {
23472341
operationPrimaryTerm);
23482342
onPermitAcquired.onFailure(new IllegalStateException(message));
23492343
} else {
2344+
assert assertReplicationTarget();
23502345
try {
23512346
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
23522347
} catch (Exception e) {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.lucene.store.FilterDirectory;
3232
import org.apache.lucene.store.IOContext;
3333
import org.apache.lucene.util.Constants;
34+
import org.elasticsearch.Assertions;
3435
import org.elasticsearch.Version;
3536
import org.elasticsearch.action.ActionListener;
3637
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@@ -559,28 +560,20 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
559560
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
560561
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
561562
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000);
563+
CountDownLatch latch = new CountDownLatch(1);
562564
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {
563565
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
566+
latch.countDown();
564567
}, 0L,
565568
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
566569
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(),
567570
Collections.emptySet());
571+
latch.await();
568572
} else {
569573
indexShard = newStartedShard(true);
570574
}
571575
final long primaryTerm = indexShard.getPendingPrimaryTerm();
572576
assertEquals(0, indexShard.getActiveOperationsCount());
573-
if (indexShard.routingEntry().isRelocationTarget() == false) {
574-
try {
575-
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
576-
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), permitAcquiredFuture,
577-
ThreadPool.Names.WRITE, "");
578-
permitAcquiredFuture.actionGet();
579-
fail("shard shouldn't accept operations as replica");
580-
} catch (IllegalStateException ignored) {
581-
582-
}
583-
}
584577
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
585578
assertEquals(1, indexShard.getActiveOperationsCount());
586579
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
@@ -589,6 +582,22 @@ public void testOperationPermitsOnPrimaryShards() throws InterruptedException, E
589582
Releasables.close(operation1, operation2);
590583
assertEquals(0, indexShard.getActiveOperationsCount());
591584

585+
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
586+
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm,
587+
indexShard.getGlobalCheckpoint(), new ActionListener<Releasable>() {
588+
@Override
589+
public void onResponse(Releasable releasable) {
590+
fail();
591+
}
592+
593+
@Override
594+
public void onFailure(Exception e) {
595+
fail();
596+
}
597+
},
598+
ThreadPool.Names.WRITE, "")).getMessage(), containsString("in primary mode cannot be a replication target"));
599+
}
600+
592601
closeShards(indexShard);
593602
}
594603

@@ -646,11 +655,11 @@ public void testOperationPermitOnReplicaShards() throws Exception {
646655
logger.info("shard routing to {}", shardRouting);
647656

648657
assertEquals(0, indexShard.getActiveOperationsCount());
649-
if (shardRouting.primary() == false) {
650-
final IllegalStateException e =
651-
expectThrows(IllegalStateException.class,
652-
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""));
653-
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
658+
if (shardRouting.primary() == false && Assertions.ENABLED) {
659+
final AssertionError e =
660+
expectThrows(AssertionError.class,
661+
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
662+
assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard")));
654663
}
655664

656665
final long primaryTerm = indexShard.getPendingPrimaryTerm();

0 commit comments

Comments
 (0)