5252import org .elasticsearch .cluster .routing .RecoverySource .SnapshotRecoverySource ;
5353import org .elasticsearch .cluster .routing .ShardRouting ;
5454import org .elasticsearch .common .Booleans ;
55+ import org .elasticsearch .common .CheckedRunnable ;
5556import org .elasticsearch .common .Nullable ;
5657import org .elasticsearch .common .collect .Tuple ;
5758import org .elasticsearch .common .io .stream .BytesStreamOutput ;
@@ -474,13 +475,11 @@ public void updateShardState(final ShardRouting newRouting,
474475 if (resyncStarted == false ) {
475476 throw new IllegalStateException ("cannot start resync while it's already in progress" );
476477 }
477- indexShardOperationPermits .asyncBlockOperations (
478- 30 ,
479- TimeUnit .MINUTES ,
478+ bumpPrimaryTerm (newPrimaryTerm ,
480479 () -> {
481- shardStateUpdated .await ();
482480 assert pendingPrimaryTerm == newPrimaryTerm :
483- "shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" ;
481+ "shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" +
482+ ", current routing: " + currentRouting + ", new routing: " + newRouting ;
484483 assert operationPrimaryTerm < newPrimaryTerm ;
485484 operationPrimaryTerm = newPrimaryTerm ;
486485 try {
@@ -527,10 +526,8 @@ public void onFailure(Exception e) {
527526 } catch (final AlreadyClosedException e ) {
528527 // okay, the index was deleted
529528 }
530- },
531- e -> failShard ("exception during primary term transition" , e ));
529+ });
532530 replicationTracker .activatePrimaryMode (getLocalCheckpoint ());
533- pendingPrimaryTerm = newPrimaryTerm ;
534531 }
535532 }
536533 // set this last, once we finished updating all internal state.
@@ -2215,7 +2212,20 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
22152212 indexShardOperationPermits .acquire (onPermitAcquired , executorOnDelay , false , debugInfo );
22162213 }
22172214
2218- private final Object primaryTermMutex = new Object ();
2215+ private <E extends Exception > void bumpPrimaryTerm (long newPrimaryTerm , final CheckedRunnable <E > onBlocked ) {
2216+ assert Thread .holdsLock (mutex );
2217+ assert newPrimaryTerm > pendingPrimaryTerm ;
2218+ assert operationPrimaryTerm <= pendingPrimaryTerm ;
2219+ final CountDownLatch termUpdated = new CountDownLatch (1 );
2220+ indexShardOperationPermits .asyncBlockOperations (30 , TimeUnit .MINUTES , () -> {
2221+ assert operationPrimaryTerm <= pendingPrimaryTerm ;
2222+ onBlocked .run ();
2223+ assert operationPrimaryTerm <= pendingPrimaryTerm ;
2224+ },
2225+ e -> failShard ("exception during primary term transition" , e ));
2226+ pendingPrimaryTerm = newPrimaryTerm ;
2227+ termUpdated .countDown ();
2228+ }
22192229
22202230 /**
22212231 * Acquire a replica operation permit whenever the shard is ready for indexing (see
@@ -2238,10 +2248,8 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
22382248 verifyNotClosed ();
22392249 verifyReplicationTarget ();
22402250 if (opPrimaryTerm > pendingPrimaryTerm ) {
2241- synchronized (primaryTermMutex ) {
2251+ synchronized (mutex ) {
22422252 if (opPrimaryTerm > pendingPrimaryTerm ) {
2243- verifyNotClosed ();
2244-
22452253 IndexShardState shardState = state ();
22462254 // only roll translog and update primary term if shard has made it past recovery
22472255 // Having a new primary term here means that the old primary failed and that there is a new primary, which again
@@ -2252,39 +2260,32 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
22522260 throw new IndexShardNotStartedException (shardId , shardState );
22532261 }
22542262
2255- synchronized (mutex ) {
2256- final CountDownLatch termUpdated = new CountDownLatch (1 );
2257- if (opPrimaryTerm > pendingPrimaryTerm ) {
2258- indexShardOperationPermits .asyncBlockOperations (30 , TimeUnit .MINUTES , () -> {
2259- termUpdated .await ();
2260- // a primary promotion, or another primary term transition, might have been triggered concurrently to this
2261- // recheck under the operation permit if we can skip doing this work
2262- if (opPrimaryTerm == pendingPrimaryTerm ) {
2263- assert operationPrimaryTerm < pendingPrimaryTerm ;
2264- operationPrimaryTerm = pendingPrimaryTerm ;
2265- updateGlobalCheckpointOnReplica (globalCheckpoint , "primary term transition" );
2266- final long currentGlobalCheckpoint = getGlobalCheckpoint ();
2267- final long localCheckpoint ;
2268- if (currentGlobalCheckpoint == SequenceNumbers .UNASSIGNED_SEQ_NO ) {
2269- localCheckpoint = SequenceNumbers .NO_OPS_PERFORMED ;
2270- } else {
2271- localCheckpoint = currentGlobalCheckpoint ;
2272- }
2273- logger .trace (
2274- "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]" ,
2275- opPrimaryTerm ,
2276- getLocalCheckpoint (),
2277- localCheckpoint );
2278- getEngine ().resetLocalCheckpoint (localCheckpoint );
2279- getEngine ().rollTranslogGeneration ();
2263+ if (opPrimaryTerm > pendingPrimaryTerm ) {
2264+ bumpPrimaryTerm (opPrimaryTerm , () -> {
2265+ // a primary promotion, or another primary term transition, might have been triggered concurrently to this
2266+ // recheck under the operation permit if we can skip doing this work
2267+ if (opPrimaryTerm == pendingPrimaryTerm ) {
2268+ assert operationPrimaryTerm < pendingPrimaryTerm ;
2269+ operationPrimaryTerm = pendingPrimaryTerm ;
2270+ updateGlobalCheckpointOnReplica (globalCheckpoint , "primary term transition" );
2271+ final long currentGlobalCheckpoint = getGlobalCheckpoint ();
2272+ final long localCheckpoint ;
2273+ if (currentGlobalCheckpoint == SequenceNumbers .UNASSIGNED_SEQ_NO ) {
2274+ localCheckpoint = SequenceNumbers .NO_OPS_PERFORMED ;
22802275 } else {
2281- logger . trace ( "a primary promotion or concurrent primary term transition has made this reset obsolete" ) ;
2276+ localCheckpoint = currentGlobalCheckpoint ;
22822277 }
2283- }, e -> failShard ("exception during primary term transition" , e ));
2284-
2285- pendingPrimaryTerm = opPrimaryTerm ;
2286- termUpdated .countDown ();
2287- }
2278+ logger .trace (
2279+ "detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]" ,
2280+ opPrimaryTerm ,
2281+ getLocalCheckpoint (),
2282+ localCheckpoint );
2283+ getEngine ().resetLocalCheckpoint (localCheckpoint );
2284+ getEngine ().rollTranslogGeneration ();
2285+ } else {
2286+ logger .trace ("a primary promotion or concurrent primary term transition has made this reset obsolete" );
2287+ }
2288+ });
22882289 }
22892290 }
22902291 }
0 commit comments