Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RW Separation] Add routing preference to route requests only to search replicas. #15563

Merged
merged 8 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- [Reader Writer Separation] Add routing preference for search replicas ([#15563](https://github.com/opensearch-project/OpenSearch/pull/15563))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
Expand Down Expand Up @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testSearchReplicaRoutingPreference() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// set preference to search replica here - we default to this when there are
// search replicas but tests will randomize this value if unset
SearchResponse response = client().prepareSearch(TEST_INDEX)
.setPreference(Preference.SEARCH_REPLICA.type())
.setQuery(QueryBuilders.matchAllQuery())
.get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,15 +647,11 @@ public ShardIterator replicaActiveInitializingShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
return filterAndOrderShards(replica -> true);
}

public ShardIterator searchReplicaActiveInitializingShardIt() {
return filterAndOrderShards(ShardRouting::isSearchOnly);
}

/**
Expand Down Expand Up @@ -686,6 +682,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() {
return new PlainShardIterator(shardId, ordered);
}

private ShardIterator filterAndOrderShards(Predicate<ShardRouting> filter) {
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (filter.test(replica)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on active and initializing shards residing on the provided nodeId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class OperationRouting {
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;
private final boolean isReaderWriterSplitEnabled;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -254,6 +256,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference = Preference.PRIMARY_FIRST.type();
}

if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
preference = Preference.SEARCH_REPLICA.type();
}
}
}

ShardIterator iterator = preferenceActiveShardIterator(
shard,
clusterState.nodes().getLocalNodeId(),
Expand Down Expand Up @@ -366,6 +376,8 @@ private ShardIterator preferenceActiveShardIterator(
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case SEARCH_REPLICA:
return indexShard.searchReplicaActiveInitializingShardIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public enum Preference {
*/
REPLICA_FIRST("_replica_first"),

/**
* Route to search replica shards
*/
SEARCH_REPLICA("_search_replica"),

/**
* Route to the local shard only
*/
Expand Down Expand Up @@ -127,6 +132,8 @@ public static Preference parse(String preference) {
return ONLY_LOCAL;
case "_only_nodes":
return ONLY_NODES;
case "_search_replica":
return SEARCH_REPLICA;
default:
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,82 @@ public void testPartialIndexPrimaryDefault() throws Exception {
}
}

public void testSearchReplicaDefaultRouting() throws Exception {
final int numShards = 1;
final int numReplicas = 2;
final int numSearchReplicas = 2;
final String indexName = "test";
final String[] indexNames = new String[] { indexName };

ClusterService clusterService = null;
ThreadPool threadPool = null;

try {
OperationRouting opRouting = new OperationRouting(
Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(
indexNames,
numShards,
numReplicas,
numSearchReplicas
);
IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0);
ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId();

threadPool = new TestThreadPool("testSearchReplicaDefaultRouting");
clusterService = ClusterServiceUtils.createClusterService(threadPool);

// add a search replica in initializing state:
DiscoveryNode node = new DiscoveryNode(
"node_initializing",
OpenSearchTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES),
Version.CURRENT
);

IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build())
.numberOfSearchReplicas(3)
.numberOfReplicas(2)
.build();
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded();
IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable);
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null)
);
state = ClusterState.builder(state)
.routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build())
.metadata(metadataBuilder.build())
.build();

// Verify default preference is primary only
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
assertThat("one group per shard", groupIterator.size(), equalTo(numShards));
for (ShardIterator shardIterator : groupIterator) {
assertEquals("We should have 3 shards returned", shardIterator.size(), 3);
int i = 0;
for (ShardRouting shardRouting : shardIterator) {
assertTrue(
"Only search replicas should exist with preference SEARCH_REPLICA",
shardIterator.nextOrNull().isSearchOnly()
);
if (i == shardIterator.size()) {
assertTrue("Initializing shard should appear last", shardRouting.initializing());
assertFalse("Initializing shard should appear last", shardRouting.active());
}
}
}
} finally {
IOUtils.close(clusterService);
terminate(threadPool);
}
}

private DiscoveryNode[] setupNodes() {
// Sets up two data nodes in zone-a and one data node in zone-b
List<String> zones = Arrays.asList("a", "a", "b");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
Expand Down Expand Up @@ -325,7 +326,18 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {
return stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas, 0);
}

/**
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(
String[] indices,
int numberOfShards,
int numberOfReplicas,
int numberOfSearchReplicas
) {
int numberOfDataNodes = numberOfReplicas + 1;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfDataNodes + 1; i++) {
Expand All @@ -347,6 +359,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
)
.build();
Expand All @@ -363,6 +376,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false, ShardRoutingState.STARTED)
);
}
for (int replica = numberOfReplicas; replica < numberOfSearchReplicas + numberOfReplicas; replica++) {
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(
new ShardId(index, IndexMetadata.INDEX_UUID_NA_VALUE, i),
newNode(replica + 1).getId(),
null,
false,
true,
ShardRoutingState.STARTED,
null
)
);
}
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
}
routingTableBuilder.add(indexRoutingTableBuilder.build());
Expand Down
Loading