-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Promote replica on the highest version node #25277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
3aa3edc
1273e99
879eef0
cb42f65
54edaf8
ef8c79d
27469f5
4da03ea
56dfe80
3ca757b
9d9d4b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.lucene.util.CollectionUtil; | ||
| import org.elasticsearch.Assertions; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
| import org.elasticsearch.cluster.metadata.MetaData; | ||
|
|
@@ -320,14 +321,18 @@ public ShardRouting activePrimary(ShardId shardId) { | |
| /** | ||
| * Returns one active replica shard for the given shard id or <code>null</code> if | ||
| * no active replica is found. | ||
| * | ||
| * Since replicas could possibly be on nodes with a older version of ES than | ||
| * the primary is, this will return replicas on the highest version of ES. | ||
| * | ||
| */ | ||
| public ShardRouting activeReplica(ShardId shardId) { | ||
| for (ShardRouting shardRouting : assignedShards(shardId)) { | ||
| if (!shardRouting.primary() && shardRouting.active()) { | ||
| return shardRouting; | ||
| } | ||
| } | ||
| return null; | ||
| public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { | ||
| return assignedShards(shardId).stream() | ||
| .filter(shr -> !shr.primary() && shr.active()) | ||
| .filter(shr -> node(shr.currentNodeId()) != null) | ||
| .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(), | ||
| Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you readd the comment why we need to consider "null" here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-added this comment |
||
| .orElse(null); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -567,7 +572,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId | |
| if (failedShard.relocatingNodeId() == null) { | ||
| if (failedShard.primary()) { | ||
| // promote active replica to primary if active replica exists (only the case for shadow replicas) | ||
| ShardRouting activeReplica = activeReplica(failedShard.shardId()); | ||
| ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); | ||
| if (activeReplica == null) { | ||
| moveToUnassigned(failedShard, unassignedInfo); | ||
| } else { | ||
|
|
@@ -596,7 +601,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId | |
| assert failedShard.active(); | ||
| if (failedShard.primary()) { | ||
| // promote active replica to primary if active replica exists | ||
| ShardRouting activeReplica = activeReplica(failedShard.shardId()); | ||
| ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); | ||
| if (activeReplica == null) { | ||
| moveToUnassigned(failedShard, unassignedInfo); | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,14 @@ | |
|
|
||
| package org.elasticsearch.cluster.routing.allocation; | ||
|
|
||
| import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.ESAllocationTestCase; | ||
| import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
| import org.elasticsearch.cluster.metadata.MetaData; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
| import org.elasticsearch.cluster.routing.RoutingNodes; | ||
| import org.elasticsearch.cluster.routing.RoutingTable; | ||
|
|
@@ -35,6 +37,7 @@ | |
| import org.elasticsearch.common.logging.Loggers; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.test.VersionUtils; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
|
|
@@ -499,7 +502,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { | |
| Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0))); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
| ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId); | ||
| ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
|
|
||
|
|
||
| // fail the primary shard, check replicas get removed as well... | ||
|
|
@@ -556,4 +559,119 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle | |
| ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
| assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); | ||
| } | ||
|
|
||
| public void testReplicaOnNewestVersionIsPromoted() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test checks one specific scenario. I think that it can be easily generalized in the way of the |
||
| AllocationService allocation = createAllocationService(Settings.builder().build()); | ||
|
|
||
| MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test") | ||
| .settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build(); | ||
|
|
||
| RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); | ||
|
|
||
| ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) | ||
| .metaData(metaData).routingTable(initialRoutingTable).build(); | ||
|
|
||
| ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0); | ||
|
|
||
| // add a single node | ||
| clusterState = ClusterState.builder(clusterState).nodes( | ||
| DiscoveryNodes.builder() | ||
| .add(newNode("node1-5.x", Version.V_5_6_0))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you generalize the test to use two arbitrary (but distinct) versions? i.e.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No? Currently the only situation that is valid for a mixed-major-version cluster is 5.6 and 6.0, we don't support mixed clusters of any other versions and 5.6.1 isn't out yet. I'm not sure how randomization would help here, other than triggering some other version-related failures :)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this PR does more than just an ordering on 5.6/6.x. It also orders among 6.0 and 6.1 nodes, which is left untested here. Either we restrict the "Promote replica on the highest version node" logic to only order 6.x nodes before 5.6 (and leave 6.0 and 6.1 unordered) or we test that this logic also properly orders 6.0 and 6.1. I agree there is no need to test 5.1 and 6.2.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I randomized the versions |
||
| .build(); | ||
| clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); | ||
|
|
||
| // start primary shard | ||
| clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); | ||
|
|
||
| // add another 5.6 node | ||
| clusterState = ClusterState.builder(clusterState).nodes( | ||
| DiscoveryNodes.builder(clusterState.nodes()) | ||
| .add(newNode("node2-5.x", Version.V_5_6_0))) | ||
| .build(); | ||
|
|
||
| // start the shards, should have 1 primary and 1 replica available | ||
| clusterState = allocation.reroute(clusterState, "reroute"); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
| clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); | ||
|
|
||
| clusterState = ClusterState.builder(clusterState).nodes( | ||
| DiscoveryNodes.builder(clusterState.nodes()) | ||
| .add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))) | ||
| .add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))) | ||
| .build(); | ||
|
|
||
| // start all the replicas | ||
| clusterState = allocation.reroute(clusterState, "reroute"); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); | ||
| clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); | ||
|
|
||
| ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
| logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); | ||
|
|
||
| // fail the primary shard again and make sure the correct replica is promoted | ||
| ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
| ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); | ||
| assertThat(newState, not(equalTo(clusterState))); | ||
| clusterState = newState; | ||
| // the primary gets allocated on another node | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); | ||
|
|
||
| ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
| assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); | ||
| assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); | ||
|
|
||
| Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); | ||
| assertNotNull(replicaNodeVersion); | ||
| logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); | ||
|
|
||
| for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) { | ||
| if ("node1".equals(cursor.value.getId())) { | ||
| // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check | ||
| continue; | ||
| } | ||
| Version nodeVer = cursor.value.getVersion(); | ||
| assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, | ||
| replicaNodeVersion.onOrAfter(nodeVer)); | ||
| } | ||
|
|
||
| startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
| logger.info("--> failing primary shard a second time, should select: {}", startedReplica); | ||
|
|
||
| // fail the primary shard again, and ensure the same thing happens | ||
| ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
| newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); | ||
| assertThat(newState, not(equalTo(clusterState))); | ||
| clusterState = newState; | ||
| // the primary gets allocated on another node | ||
| assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
|
|
||
| newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
| assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail))); | ||
| assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); | ||
|
|
||
| replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); | ||
| assertNotNull(replicaNodeVersion); | ||
| logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); | ||
|
|
||
| for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) { | ||
| if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) || | ||
| secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) { | ||
| // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check | ||
| continue; | ||
| } | ||
| Version nodeVer = cursor.value.getVersion(); | ||
| assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, | ||
| replicaNodeVersion.onOrAfter(nodeVer)); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| package org.elasticsearch.indices.cluster; | ||
|
|
||
| import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; | ||
|
|
@@ -50,6 +51,7 @@ | |
| import org.elasticsearch.index.Index; | ||
| import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; | ||
| import org.elasticsearch.repositories.RepositoriesService; | ||
| import org.elasticsearch.test.VersionUtils; | ||
| import org.elasticsearch.threadpool.TestThreadPool; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.TransportService; | ||
|
|
@@ -72,6 +74,8 @@ | |
| import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; | ||
| import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; | ||
| import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; | ||
| import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; | ||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
|
|
@@ -139,6 +143,87 @@ public void testRandomClusterStateUpdates() { | |
| logger.info("Final cluster state: {}", state); | ||
| } | ||
|
|
||
| public void testRandomClusterPromotesNewestReplica() { | ||
| // we have an IndicesClusterStateService per node in the cluster | ||
| final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>(); | ||
| ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new); | ||
|
|
||
| // randomly add nodes of mixed versions | ||
| logger.info("--> adding random nodes"); | ||
| for (int i = 0; i < randomIntBetween(4, 8); i++) { | ||
| DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) | ||
| .add(createRandomVersionNode()).build(); | ||
| state = ClusterState.builder(state).nodes(newNodes).build(); | ||
| state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave | ||
| updateNodes(state, clusterStateServiceMap, MockIndicesService::new); | ||
| } | ||
|
|
||
| // Log the shard versions (for debugging if necessary) | ||
| for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) { | ||
| Version nodeVer = cursor.value.getVersion(); | ||
| logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); | ||
| } | ||
|
|
||
| // randomly create some indices | ||
| logger.info("--> creating some indices"); | ||
| for (int i = 0; i < randomIntBetween(2, 5); i++) { | ||
| String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); | ||
| Settings.Builder settingsBuilder = Settings.builder() | ||
| .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) | ||
| .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)) | ||
| .put("index.routing.allocation.total_shards_per_node", 1); | ||
|
||
| CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); | ||
| state = cluster.createIndex(state, request); | ||
| assertTrue(state.metaData().hasIndex(name)); | ||
| } | ||
| state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
|
|
||
| ClusterState previousState = state; | ||
| // apply cluster state to nodes (incl. master) | ||
| for (DiscoveryNode node : state.nodes()) { | ||
| IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); | ||
|
||
| ClusterState localState = adaptClusterStateToLocalNode(state, node); | ||
| ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); | ||
| final ClusterChangedEvent event = new ClusterChangedEvent("simulating change", localState, previousLocalState); | ||
| indicesClusterStateService.applyClusterState(event); | ||
|
|
||
| // check that cluster state has been properly applied to node | ||
| assertClusterStateMatchesNodeState(localState, indicesClusterStateService); | ||
| } | ||
|
|
||
| logger.info("--> starting shards"); | ||
| state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
|
||
| state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
|
||
| logger.info("--> starting replicas"); | ||
| state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
|
||
| state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
|
|
||
| logger.info("--> state before failing shards: {}", state); | ||
| for (int i = 0; i < randomIntBetween(5, 10); i++) { | ||
| for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { | ||
| if (shardRouting.primary() && randomBoolean()) { | ||
| ShardRouting replicaToBePromoted = state.getRoutingNodes() | ||
| .activeReplicaWithHighestVersion(shardRouting.shardId()); | ||
|
||
| if (replicaToBePromoted != null) { | ||
| Version replicaNodeVersion = state.nodes().getDataNodes() | ||
| .get(replicaToBePromoted.currentNodeId()).getVersion(); | ||
| List<FailedShard> shardsToFail = new ArrayList<>(); | ||
| logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); | ||
| logger.info("--> failing shard {}", shardRouting); | ||
| shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); | ||
|
||
| state = cluster.applyFailedShards(state, shardsToFail); | ||
|
||
| ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) | ||
| .shard(shardRouting.id()).primaryShard(); | ||
|
|
||
| assertThat(newPrimary.allocationId().getId(), | ||
| equalTo(replicaToBePromoted.allocationId().getId())); | ||
| } | ||
| } | ||
| state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This test ensures that when a node joins a brand new cluster (different cluster UUID), | ||
| * different from the cluster it was previously a part of, the in-memory index data structures | ||
|
|
@@ -388,6 +473,16 @@ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { | |
| Version.CURRENT); | ||
| } | ||
|
|
||
| protected DiscoveryNode createRandomVersionNode(DiscoveryNode.Role... mustHaveRoles) { | ||
| Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); | ||
| for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { | ||
| roles.add(mustHaveRole); | ||
| } | ||
| final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); | ||
| return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, | ||
| VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); | ||
| } | ||
|
|
||
| private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) { | ||
| return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?