Skip to content

Commit

Permalink
[Remote Store - Dual Replication] Create missing Retention Leases for…
Browse files Browse the repository at this point in the history
… docrep shard copies during failover (#13159)

Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 authored Apr 16, 2024
1 parent 035d8b8 commit 3c8eafd
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class RemoteDualReplicationIT extends MigrationBaseTestCase {
private final String REMOTE_PRI_DOCREP_REP = "remote-primary-docrep-replica";
private final String REMOTE_PRI_DOCREP_REMOTE_REP = "remote-primary-docrep-remote-replica";
private final String FAILOVER_REMOTE_TO_DOCREP = "failover-remote-to-docrep";
private final String FAILOVER_REMOTE_TO_REMOTE = "failover-remote-to-remote";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -241,14 +242,63 @@ RLs on remote enabled copies are brought up to (GlobalCkp + 1) upon a flush requ
*/
extraSettings = Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build();
testRemotePrimaryDocRepAndRemoteReplica();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertBusy(() -> {
for (ShardStats shardStats : internalCluster().client()
pollAndCheckRetentionLeases(REMOTE_PRI_DOCREP_REMOTE_REP);
}

public void testMissingRetentionLeaseCreatedOnFailedOverRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting docrep data node");
internalCluster().startDataOnlyNode();

Settings zeroReplicasAndOverridenSyncIntervals = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build();
createIndex(FAILOVER_REMOTE_TO_REMOTE, zeroReplicasAndOverridenSyncIntervals);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);

indexBulk(FAILOVER_REMOTE_TO_REMOTE, 100);

logger.info("---> Starting first remote node");
initDocRepToRemoteMigration();
addRemote = true;
String firstRemoteNode = internalCluster().startDataOnlyNode();
String primaryShardHostingNode = primaryNodeName(FAILOVER_REMOTE_TO_REMOTE);
logger.info("---> Moving primary copy from {} to remote enabled node {}", primaryShardHostingNode, firstRemoteNode);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, primaryShardHostingNode, firstRemoteNode))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, 100, 0);

String secondRemoteNode = internalCluster().startDataOnlyNode();
Settings twoReplicas = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build();
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareStats(REMOTE_PRI_DOCREP_REMOTE_REP)
.prepareUpdateSettings()
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
.setSettings(twoReplicas)
.get()
.getShards()) {
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);

logger.info("---> Checking retention leases");
pollAndCheckRetentionLeases(FAILOVER_REMOTE_TO_REMOTE);
}

private void pollAndCheckRetentionLeases(String indexName) throws Exception {
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
assertBusy(() -> {
for (ShardStats shardStats : internalCluster().client().admin().indices().prepareStats(indexName).get().getShards()) {
ShardRouting shardRouting = shardStats.getShardRouting();
DiscoveryNode discoveryNode = nodes.get(shardRouting.currentNodeId());
RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L

private final Function<String, Boolean> isShardOnRemoteEnabledNode;

/**
* Flag to indicate whether {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}
* has been run successfully
*/
private boolean createdMissingRetentionLeases;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -955,7 +961,13 @@ private boolean invariant() {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases
// Skip assertion if createMissingPeerRecoveryRetentionLeases has not yet run after activating primary context
// This is required since during an ongoing remote store migration,
// remote enabled primary taking over primary context from another remote enabled shard
// might not have retention leases for docrep shard copies
// (since all RetentionLease sync actions are blocked on remote shard copies)
&& createdMissingRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
final CheckpointState cps = checkpoints.get(shardRouting.allocationId().getId());
Expand Down Expand Up @@ -1843,19 +1855,34 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
assert invariant();
}

private synchronized void setCreatedMissingRetentionLeases() {
createdMissingRetentionLeases = true;
assert invariant();
}

public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
return hasAllPeerRecoveryRetentionLeases;
}

/**
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
* prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases.
* Create any required peer-recovery retention leases that do not currently exist. This can happen if either:
* - We just did a rolling upgrade from a version prior to {@code LegacyESVersion#V_7_4_0} that does not create peer-recovery retention leases.
* - In a mixed mode cluster (during remote store migration), a remote enabled primary shard copy fails over to another remote enabled shard copy,
* but the replication group still has other shards in docrep nodes
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (hasAllPeerRecoveryRetentionLeases == false) {
// Create missing RetentionLeases if the primary is on a remote enabled
// and the replication group has at-least one shard copy in docrep enabled node
// No-Op if retention leases for the tracked shard copy already exists
boolean createMissingRetentionLeasesDuringMigration = indexSettings.isAssignedOnRemoteNode()
&& replicationGroup.getReplicationTargets()
.stream()
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()) == false);
if (hasAllPeerRecoveryRetentionLeases == false || createMissingRetentionLeasesDuringMigration) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
setCreatedMissingRetentionLeases();
listener.onResponse(null);
}, listener::onFailure), shardRoutings.size());
for (ShardRouting shardRouting : shardRoutings) {
Expand Down

0 comments on commit 3c8eafd

Please sign in to comment.