Skip to content

Commit 3aa3edc

Browse files
committed
Promote replica on the highest version node
This changes the replica selection to prefer to return replicas on the highest version when choosing a replacement to promote when the primary shard fails. Consider this situation: - A replica on a 5.6 node - Another replica on a 6.0 node - The primary on a 6.0 node The primary shard is sending sequence numbers to the replica on the 6.0 node and skipping sending them for the 5.6 node. Now assume that the primary shard fails and (prior to this change) the replica on 5.6 node gets promoted to primary, it now has no knowledge of sequence numbers and the replica on the 6.0 node will be expecting sequence numbers but will never receive them. Relates to #10708
1 parent 2fb4a0d commit 3aa3edc

File tree

2 files changed

+144
-6
lines changed

2 files changed

+144
-6
lines changed

core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.logging.log4j.Logger;
2525
import org.apache.lucene.util.CollectionUtil;
2626
import org.elasticsearch.Assertions;
27+
import org.elasticsearch.Version;
2728
import org.elasticsearch.cluster.ClusterState;
2829
import org.elasticsearch.cluster.metadata.IndexMetaData;
2930
import org.elasticsearch.cluster.metadata.MetaData;
@@ -67,6 +68,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
6768

6869
private final Map<String, RoutingNode> nodesToShards = new HashMap<>();
6970

71+
private final Map<String, Version> nodesToVersions = new HashMap<>();
72+
7073
private final UnassignedShards unassignedShards = new UnassignedShards(this);
7174

7275
private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();
@@ -94,6 +97,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) {
9497
// fill in the nodeToShards with the "live" nodes
9598
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
9699
nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order
100+
nodesToVersions.put(cursor.value.getId(), cursor.value.getVersion());
97101
}
98102

99103
// fill in the inverse of node -> shards allocated
@@ -320,14 +324,32 @@ public ShardRouting activePrimary(ShardId shardId) {
320324
/**
321325
* Returns one active replica shard for the given shard id or <code>null</code> if
322326
* no active replica is found.
327+
*
328+
* Since replicas could possibly be on nodes with a older version of ES than
329+
* the primary is, this will return replicas on the highest version of ES.
330+
*
323331
*/
324-
public ShardRouting activeReplica(ShardId shardId) {
332+
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
333+
Version highestVersionSeen = null;
334+
ShardRouting candidate = null;
325335
for (ShardRouting shardRouting : assignedShards(shardId)) {
326336
if (!shardRouting.primary() && shardRouting.active()) {
327-
return shardRouting;
337+
// It's possible for replicaNodeVersion to be null, when deassociating dead nodes
338+
// that have been removed, the shards are failed, and part of the shard failing
339+
// calls this method with an out-of-date RoutingNodes, where the version might not
340+
// be accessible. Therefore, we need to protect against the version being null
341+
// (meaning the node will be going away).
342+
Version replicaNodeVersion = nodesToVersions.get(shardRouting.currentNodeId());
343+
if (replicaNodeVersion == null && candidate == null) {
344+
// Only use this replica if there are no other candidates
345+
candidate = shardRouting;
346+
} else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) {
347+
highestVersionSeen = replicaNodeVersion;
348+
candidate = shardRouting;
349+
}
328350
}
329351
}
330-
return null;
352+
return candidate;
331353
}
332354

333355
/**
@@ -567,7 +589,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
567589
if (failedShard.relocatingNodeId() == null) {
568590
if (failedShard.primary()) {
569591
// promote active replica to primary if active replica exists (only the case for shadow replicas)
570-
ShardRouting activeReplica = activeReplica(failedShard.shardId());
592+
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
571593
if (activeReplica == null) {
572594
moveToUnassigned(failedShard, unassignedInfo);
573595
} else {
@@ -596,7 +618,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
596618
assert failedShard.active();
597619
if (failedShard.primary()) {
598620
// promote active replica to primary if active replica exists
599-
ShardRouting activeReplica = activeReplica(failedShard.shardId());
621+
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
600622
if (activeReplica == null) {
601623
moveToUnassigned(failedShard, unassignedInfo);
602624
} else {

core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation;
2121

22+
import com.carrotsearch.hppc.cursors.ObjectCursor;
2223
import org.apache.logging.log4j.Logger;
2324
import org.elasticsearch.Version;
2425
import org.elasticsearch.cluster.ClusterState;
2526
import org.elasticsearch.cluster.ESAllocationTestCase;
2627
import org.elasticsearch.cluster.metadata.IndexMetaData;
2728
import org.elasticsearch.cluster.metadata.MetaData;
29+
import org.elasticsearch.cluster.node.DiscoveryNode;
2830
import org.elasticsearch.cluster.node.DiscoveryNodes;
2931
import org.elasticsearch.cluster.routing.RoutingNodes;
3032
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -499,7 +501,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() {
499501
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)));
500502
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
501503
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
502-
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId);
504+
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
503505

504506

505507
// fail the primary shard, check replicas get removed as well...
@@ -556,4 +558,118 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle
556558
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
557559
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
558560
}
561+
562+
public void testReplicaOnNewestVersionIsPromoted() {
563+
AllocationService allocation = createAllocationService(Settings.builder().build());
564+
565+
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test")
566+
.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build();
567+
568+
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
569+
570+
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
571+
.metaData(metaData).routingTable(initialRoutingTable).build();
572+
573+
ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0);
574+
575+
// add a single node
576+
clusterState = ClusterState.builder(clusterState).nodes(
577+
DiscoveryNodes.builder()
578+
.add(newNode("node1-5.x", Version.V_5_6_0)))
579+
.build();
580+
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
581+
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
582+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
583+
584+
// start primary shard
585+
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
586+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
587+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3));
588+
589+
// add another 5.6 node
590+
clusterState = ClusterState.builder(clusterState).nodes(
591+
DiscoveryNodes.builder(clusterState.nodes())
592+
.add(newNode("node2-5.x", Version.V_5_6_0)))
593+
.build();
594+
595+
// start the shards, should have 1 primary and 1 replica available
596+
clusterState = allocation.reroute(clusterState, "reroute");
597+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
598+
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
599+
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
600+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
601+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
602+
603+
clusterState = ClusterState.builder(clusterState).nodes(
604+
DiscoveryNodes.builder(clusterState.nodes())
605+
.add(newNode("node3-6.x", Version.V_6_0_0_alpha3))
606+
.add(newNode("node4-6.x", Version.V_6_0_0_alpha3)))
607+
.build();
608+
609+
// start all the replicas
610+
clusterState = allocation.reroute(clusterState, "reroute");
611+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
612+
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
613+
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
614+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
615+
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));
616+
617+
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
618+
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);
619+
620+
// fail the primary shard, check replicas get removed as well...
621+
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
622+
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
623+
assertThat(newState, not(equalTo(clusterState)));
624+
clusterState = newState;
625+
// the primary gets allocated on another node
626+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3));
627+
628+
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
629+
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
630+
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
631+
632+
Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
633+
assertNotNull(replicaNodeVersion);
634+
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
635+
636+
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
637+
if ("node1".equals(cursor.value.getId())) {
638+
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
639+
continue;
640+
}
641+
Version nodeVer = cursor.value.getVersion();
642+
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
643+
replicaNodeVersion.onOrAfter(nodeVer));
644+
}
645+
646+
startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
647+
logger.info("--> failing primary shard a second time, should select: {}", startedReplica);
648+
649+
// fail the primary shard again, and ensure the same thing happens
650+
primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
651+
newState = allocation.applyFailedShard(clusterState, primaryShardToFail);
652+
assertThat(newState, not(equalTo(clusterState)));
653+
clusterState = newState;
654+
// the primary gets allocated on another node
655+
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
656+
657+
newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
658+
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
659+
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId()));
660+
661+
replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion();
662+
assertNotNull(replicaNodeVersion);
663+
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion);
664+
665+
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
666+
if ("node1".equals(cursor.value.getId())) {
667+
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check
668+
continue;
669+
}
670+
Version nodeVer = cursor.value.getVersion();
671+
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
672+
replicaNodeVersion.onOrAfter(nodeVer));
673+
}
674+
}
559675
}

0 commit comments

Comments
 (0)