Skip to content

Commit

Permalink
Add PUT api to update shard routing weights (opensearch-project#4272)
Browse files Browse the repository at this point in the history
* Add PUT api to update shard routing weights

Signed-off-by: Anshu Agarwal <[email protected]>
  • Loading branch information
anshu1106 authored and ashking94 committed Nov 7, 2022
1 parent bdf06df commit f7278e8
Show file tree
Hide file tree
Showing 18 changed files with 1,004 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))


### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards",
"remote_store.restore", };
"remote_store.restore",
"cluster.put_weighted_routing", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.put_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/put",
"description": "Updates weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/{attribute}/weights",
"methods": [
"PUT"
],
"parts": {
"attribute": {
"type": "string",
"description": "Awareness attribute name"
}
}
}
]
}
}
}
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
Expand Down Expand Up @@ -294,6 +296,7 @@
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction;
import org.opensearch.rest.action.admin.cluster.RestClusterSearchShardsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterStateAction;
Expand Down Expand Up @@ -563,6 +566,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -744,6 +748,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCloseIndexAction());
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestClusterPutWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.ActionType;

/**
* Action to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public final class ClusterAddWeightedRoutingAction extends ActionType<ClusterPutWeightedRoutingResponse> {

public static final ClusterAddWeightedRoutingAction INSTANCE = new ClusterAddWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/put";

private ClusterAddWeightedRoutingAction() {
super(NAME, ClusterPutWeightedRoutingResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchGenerationException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterPutWeightedRoutingRequest> {
private static final Logger logger = LogManager.getLogger(ClusterPutWeightedRoutingRequest.class);

private WeightedRouting weightedRouting;
private String attributeName;

public ClusterPutWeightedRoutingRequest() {}

public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

public ClusterPutWeightedRoutingRequest setWeightedRouting(WeightedRouting weightedRouting) {
this.weightedRouting = weightedRouting;
return this;
}

public void attributeName(String attributeName) {
this.attributeName = attributeName;
}

public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
weightedRouting = new WeightedRouting(in);
}

public ClusterPutWeightedRoutingRequest(String attributeName) {
this.attributeName = attributeName;
}

public void setWeightedRouting(Map<String, String> source) {
try {
if (source.isEmpty()) {
throw new OpenSearchParseException(("Empty request body"));
}
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(source);
setWeightedRouting(BytesReference.bytes(builder), builder.contentType());
} catch (IOException e) {
throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e);
}
}

public void setWeightedRouting(BytesReference source, XContentType contentType) {
try (
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
source,
contentType
)
) {
String attrValue = null;
Map<String, Double> weights = new HashMap<>();
Double attrWeight = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrValue = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrWeight = Double.parseDouble(parser.text());
weights.put(attrValue, attrWeight);
} else {
throw new OpenSearchParseException(
"failed to parse weighted routing request attribute [{}], " + "unknown type",
attrWeight
);
}
}
this.weightedRouting = new WeightedRouting(this.attributeName, weights);
} catch (IOException e) {
logger.error("error while parsing put for weighted routing request object", e);
}
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (weightedRouting == null) {
validationException = addValidationError("Weighted routing request object is null", validationException);
}
if (weightedRouting.attributeName() == null || weightedRouting.attributeName().isEmpty()) {
validationException = addValidationError("Attribute name is missing", validationException);
}
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
if (countValueWithZeroWeights > 1) {
validationException = addValidationError(
(String.format(Locale.ROOT, "More than one [%d] value has weight set as 0", countValueWithZeroWeights)),
validationException
);
}
return validationException;
}

/**
* @param source weights definition from request body
* @return this request
*/
public ClusterPutWeightedRoutingRequest source(Map<String, String> source) {
setWeightedRouting(source);
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
weightedRouting.writeTo(out);
}

@Override
public String toString() {
return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder;
import org.opensearch.client.OpenSearchClient;
import org.opensearch.cluster.routing.WeightedRouting;

/**
* Request builder to update weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingRequestBuilder extends ClusterManagerNodeOperationRequestBuilder<
ClusterPutWeightedRoutingRequest,
ClusterPutWeightedRoutingResponse,
ClusterPutWeightedRoutingRequestBuilder> {
public ClusterPutWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterAddWeightedRoutingAction action) {
super(client, action, new ClusterPutWeightedRoutingRequest());
}

public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRouting weightedRouting) {
request.setWeightedRouting(weightedRouting);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.put;

import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Response from updating weights for weighted round-robin search routing policy.
*
* @opensearch.internal
*/
public class ClusterPutWeightedRoutingResponse extends AcknowledgedResponse {
public ClusterPutWeightedRoutingResponse(boolean acknowledged) {
super(acknowledged);
}

public ClusterPutWeightedRoutingResponse(StreamInput in) throws IOException {
super(in);
}
}
Loading

0 comments on commit f7278e8

Please sign in to comment.