Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579))
- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand All @@ -59,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testConcurrentDecommissionAction() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

AtomicInteger numRequestAcknowledged = new AtomicInteger();
AtomicInteger numRequestUnAcknowledged = new AtomicInteger();
AtomicInteger numRequestFailed = new AtomicInteger();
int concurrentRuns = randomIntBetween(5, 10);
TestThreadPool testThreadPool = null;
logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a');
try {
testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering decommission action");
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
try {
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
if (decommissionResponse.isAcknowledged()) {
numRequestAcknowledged.incrementAndGet();
} else {
numRequestUnAcknowledged.incrementAndGet();
}
} catch (Exception e) {
numRequestFailed.incrementAndGet();
}
countDownLatch.countDown();
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get());
assertEquals(concurrentRuns - 1, numRequestFailed.get());
assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get());
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Register decommission request.
* <p>
* Registers a decommission request with decommission attribute and timeout
*
* @opensearch.internal
Expand All @@ -32,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private String requestID;
private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
Expand All @@ -49,6 +47,7 @@ public DecommissionRequest(StreamInput in) throws IOException {
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.requestID = in.readOptionalString();
}

@Override
Expand All @@ -57,6 +56,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeOptionalString(requestID);
}

/**
Expand All @@ -77,25 +77,45 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public void setDelayTimeout(TimeValue delayTimeout) {
public DecommissionRequest setDelayTimeout(TimeValue delayTimeout) {
this.delayTimeout = delayTimeout;
return this;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
public DecommissionRequest setNoDelay(boolean noDelay) {
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
return this;
}

public boolean isNoDelay() {
return noDelay;
}

/**
* Sets id for decommission request
*
* @param requestID uuid for request
* @return this request
*/
public DecommissionRequest setRequestID(String requestID) {
this.requestID = requestID;
return this;
}

/**
* @return Returns id of decommission request
*/
public String requestID() {
return requestID;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -122,6 +142,13 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", delayTimeout="
+ delayTimeout
+ ", noDelay="
+ noDelay
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}

/**
* Sets request id for decommission request
*
* @param requestID for decommission request
* @return current object
*/
public DecommissionRequestBuilder requestID(String requestID) {
request.setRequestID(requestID);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>

private final DecommissionAttribute decommissionAttribute;
private DecommissionStatus status;
private String requestID;
public static final String attributeType = "awareness";

/**
Expand All @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>
* @param decommissionAttribute attribute details
* @param status current status of the attribute decommission
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) {
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) {
this.decommissionAttribute = decommissionAttribute;
this.status = status;
this.requestID = requestId;
}

/**
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT}
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id
*
* @param decommissionAttribute attribute details
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, DecommissionStatus.INIT);
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) {
this(decommissionAttribute, DecommissionStatus.INIT, requestID);
}

/**
Expand All @@ -77,6 +79,15 @@ public DecommissionStatus status() {
return this.status;
}

/**
* Returns the request id of the decommission
*
* @return request id
*/
public String requestID() {
return this.requestID;
}

/**
* Returns instance of the metadata with updated status
* @param newStatus status to be updated with
Expand Down Expand Up @@ -128,12 +139,13 @@ public boolean equals(Object o) {
DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o;

if (!status.equals(that.status)) return false;
if (!requestID.equals(that.requestID)) return false;
return decommissionAttribute.equals(that.decommissionAttribute);
}

@Override
public int hashCode() {
return Objects.hash(attributeType, decommissionAttribute, status);
return Objects.hash(attributeType, decommissionAttribute, status, requestID);
}

/**
Expand All @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() {
public DecommissionAttributeMetadata(StreamInput in) throws IOException {
this.decommissionAttribute = new DecommissionAttribute(in);
this.status = DecommissionStatus.fromString(in.readString());
this.requestID = in.readString();
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -165,12 +178,14 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeString(status.status());
out.writeString(requestID);
}

public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
DecommissionAttribute decommissionAttribute = null;
DecommissionStatus status = null;
String requestID = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
Expand Down Expand Up @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
);
}
status = DecommissionStatus.fromString(parser.text());
} else if ("requestID".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new OpenSearchParseException(
"failed to parse status of decommissioning, expected string but found unknown type"
);
}
requestID = parser.text();
} else {
throw new OpenSearchParseException(
"unknown field found [{}], failed to parse the decommission attribute",
Expand All @@ -218,15 +240,15 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
}
}
}
return new DecommissionAttributeMetadata(decommissionAttribute, status);
return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID);
}

/**
* {@inheritDoc}
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(decommissionAttribute, status, attributeType, builder, params);
toXContent(decommissionAttribute, status, requestID, attributeType, builder, params);
return builder;
}

Expand All @@ -245,6 +267,7 @@ public EnumSet<Metadata.XContentContext> context() {
public static void toXContent(
DecommissionAttribute decommissionAttribute,
DecommissionStatus status,
String requestID,
String attributeType,
XContentBuilder builder,
ToXContent.Params params
Expand All @@ -253,6 +276,7 @@ public static void toXContent(
builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue());
builder.endObject();
builder.field("status", status.status());
builder.field("requestID", requestID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.validateNewStatus(decommissionStatus);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
decommissionStatus,
decommissionAttributeMetadata.requestID()
);
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
Expand Down
Loading