Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ public DiscoveryNode(StreamInput in) throws IOException {
throw new AssertionError(legacyRole.roleName());
}
}
// we have to assume that an old node has the remote_cluster_client role as it does not serialize that role
roles.add(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}
this.roles = Collections.unmodifiableSortedSet(new TreeSet<>(roles));
this.version = Version.readVersion(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;

import java.net.InetAddress;
import java.util.HashSet;
Expand All @@ -37,6 +38,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.not;

public class DiscoveryNodeTests extends ESTestCase {
Expand All @@ -61,6 +63,38 @@ public void testRolesAreSorted() {

}

public void testRemoteClusterClientRole() throws Exception {
{
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_3_0));
final Set<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
final DiscoveryNode node = new DiscoveryNode("name", "id", buildNewFakeTransportAddress(), emptyMap(), roles, version);
assertThat(node.isRemoteClusterClient(), equalTo(roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)));
final BytesStreamOutput streamOutput = new BytesStreamOutput();
streamOutput.setVersion(version);
node.writeTo(streamOutput);
final StreamInput streamInput = StreamInput.wrap(streamOutput.bytes().toBytesRef().bytes);
streamInput.setVersion(version);
final DiscoveryNode serializedNode = new DiscoveryNode(streamInput);
assertThat(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE, in(serializedNode.getRoles()));
assertTrue(serializedNode.isRemoteClusterClient());
}
{
final HashSet<DiscoveryNodeRole> roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES));
final Version version = VersionUtils.randomVersionBetween(random(), Version.V_7_3_0, Version.CURRENT);
final DiscoveryNode node = new DiscoveryNode("name", "id", buildNewFakeTransportAddress(), emptyMap(), roles, version);
assertThat(node.isRemoteClusterClient(), equalTo(roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)));
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(version);
node.writeTo(out);
final StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes);
in.setVersion(version);
final DiscoveryNode serializedNode = new DiscoveryNode(in);
assertThat(serializedNode.getRoles(), equalTo(roles));
assertThat(serializedNode.isRemoteClusterClient(), equalTo(roles.contains(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)));
}
}

public void testDiscoveryNodeIsCreatedWithHostFromInetAddress() throws Exception {
InetAddress inetAddress = randomBoolean() ? InetAddress.getByName("192.0.2.1") :
InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,24 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -146,14 +151,18 @@ public void testAlreadyBootstrappedFollowerIndex() {
}
}

public void testBootstrappingFollowerIndex() {
public void testBootstrappingFollowerIndex() throws Exception {
String index = "test-index";
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1).numberOfReplicas(1);
DiscoveryNode dataOnlyNode = newNode("d1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
DiscoveryNode dataAndRemoteNode = newNode("dr1",
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
final DiscoveryNode dataOnlyNode = newNode("d1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE));
final DiscoveryNode dataAndRemoteNode;
if (randomBoolean()) {
dataAndRemoteNode = newNode("dr1", Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE));
} else {
dataAndRemoteNode = newNodeWithLegacyRoles("dr1");
}
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(dataOnlyNode).add(dataAndRemoteNode).build();
Metadata metadata = Metadata.builder().put(indexMetadata).build();
RoutingTable.Builder routingTable = RoutingTable.builder()
Expand Down Expand Up @@ -181,6 +190,22 @@ public void testBootstrappingFollowerIndex() {
}
}

static DiscoveryNode newNodeWithLegacyRoles(String id) throws IOException {
final Version version = VersionUtils.randomVersionBetween(random(),
Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_3_0));
final Set<DiscoveryNodeRole> roles = Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE);
if (randomBoolean()) {
roles.add(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE);
}
final DiscoveryNode node = new DiscoveryNode(id, buildNewFakeTransportAddress(), emptyMap(), roles, version);
final BytesStreamOutput streamOutput = new BytesStreamOutput();
streamOutput.setVersion(version);
node.writeTo(streamOutput);
final StreamInput streamInput = StreamInput.wrap(streamOutput.bytes().toBytesRef().bytes);
streamInput.setVersion(version);
return new DiscoveryNode(streamInput);
}

static Decision executeAllocation(ClusterState clusterState, ShardRouting shardRouting, DiscoveryNode node) {
final AllocationDecider decider = new CcrPrimaryFollowerAllocationDecider();
final RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Collections.singletonList(decider)),
Expand Down