6060import java .util .stream .Collectors ;
6161import java .util .stream .LongStream ;
6262import java .util .stream .Stream ;
63+ import java .util .stream .StreamSupport ;
6364
6465/**
6566 * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
@@ -462,6 +463,7 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
462463 */
463464 public synchronized void renewPeerRecoveryRetentionLeases () {
464465 assert primaryMode ;
466+ assert invariant ();
465467
466468 /*
467469 * Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
@@ -471,28 +473,40 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
471473 */
472474 final long renewalTimeMillis = currentTimeMillisSupplier .getAsLong () - indexSettings .getRetentionLeaseMillis () / 2 ;
473475
474- for (ShardRouting shardRouting : routingTable ) {
475- if (shardRouting .assignedToNode ()) {
476- final CheckpointState checkpointState = checkpoints .get (shardRouting .allocationId ().getId ());
476+ /*
477+ * If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all.
478+ */
479+ final boolean renewalNeeded = StreamSupport .stream (routingTable .spliterator (), false ).filter (ShardRouting ::assignedToNode )
480+ .anyMatch (shardRouting -> {
477481 final RetentionLease retentionLease = retentionLeases .get (getPeerRecoveryRetentionLeaseId (shardRouting ));
478482 if (retentionLease == null ) {
479- if (checkpointState .tracked ) {
480- /*
481- * BWC: We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
482- * leases for every shard copy. TODO create leases lazily
483- */
484- assert indexSettings .getIndexVersionCreated ().before (Version .V_8_0_0 ) : indexSettings .getIndexVersionCreated ();
485- }
486- } else {
487- if (retentionLease .retainingSequenceNumber () <= checkpointState .globalCheckpoint
488- || retentionLease .timestamp () <= renewalTimeMillis ) {
483+ /*
484+ * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
485+ * create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
486+ */
487+ assert checkpoints .get (shardRouting .allocationId ().getId ()).tracked == false
488+ || indexSettings .getIndexVersionCreated ().before (Version .V_8_0_0 );
489+ return false ;
490+ }
491+ return retentionLease .timestamp () <= renewalTimeMillis
492+ || retentionLease .retainingSequenceNumber () <= checkpoints .get (shardRouting .allocationId ().getId ()).globalCheckpoint ;
493+ });
494+
495+ if (renewalNeeded ) {
496+ for (ShardRouting shardRouting : routingTable ) {
497+ if (shardRouting .assignedToNode ()) {
498+ final RetentionLease retentionLease = retentionLeases .get (getPeerRecoveryRetentionLeaseId (shardRouting ));
499+ if (retentionLease != null ) {
500+ final CheckpointState checkpointState = checkpoints .get (shardRouting .allocationId ().getId ());
489501 renewRetentionLease (getPeerRecoveryRetentionLeaseId (shardRouting ),
490502 Math .max (0L , checkpointState .globalCheckpoint + 1L ),
491503 PEER_RECOVERY_RETENTION_LEASE_SOURCE );
492504 }
493505 }
494506 }
495507 }
508+
509+ assert invariant ();
496510 }
497511
498512 public static class CheckpointState implements Writeable {
0 commit comments