Skip to content

Commit

Permalink
Add DecommissionService and helper to execute awareness attribute dec…
Browse files Browse the repository at this point in the history
…ommissioning (opensearch-project#4084)

* Add Executor to decommission node attribute
* Decommission service implementation with cluster metadata
* Master abdication changes to decommission local awareness leader
* Update join validator changes to validate decommissioned node join request

Signed-off-by: Rishab Nahata <[email protected]>
Signed-off-by: Vishal Sarda <[email protected]>
  • Loading branch information
imRishN authored and Vishalks committed Sep 28, 2022
1 parent 0217c87 commit 2ca6cfb
Show file tree
Hide file tree
Showing 20 changed files with 2,226 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))


### Deprecated

Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,18 @@ private enum OpenSearchExceptionHandle {
org.opensearch.index.shard.PrimaryShardClosedException::new,
162,
V_3_0_0
),
DECOMMISSIONING_FAILED_EXCEPTION(
org.opensearch.cluster.decommission.DecommissioningFailedException.class,
org.opensearch.cluster.decommission.DecommissioningFailedException::new,
163,
V_3_0_0
),
NODE_DECOMMISSIONED_EXCEPTION(
org.opensearch.cluster.decommission.NodeDecommissionedException.class,
org.opensearch.cluster.decommission.NodeDecommissionedException::new,
164,
V_3_0_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.action.index.NodeMappingRefreshAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.metadata.ComponentTemplateMetadata;
import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata;
import org.opensearch.cluster.metadata.DataStreamMetadata;
Expand Down Expand Up @@ -193,6 +194,12 @@ public static List<Entry> getNamedWriteables() {
);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom);
registerMetadataCustom(
entries,
DecommissionAttributeMetadata.TYPE,
DecommissionAttributeMetadata::new,
DecommissionAttributeMetadata::readDiffFrom
);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand Down Expand Up @@ -283,6 +290,13 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
WeightedRoutingMetadata::fromXContent
)
);
entries.add(
new NamedXContentRegistry.Entry(
Metadata.Custom.class,
new ParseField(DecommissionAttributeMetadata.TYPE),
DecommissionAttributeMetadata::fromXContent
)
);
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -358,6 +362,7 @@ public boolean runOnlyOnClusterManager() {

/**
* a task indicates that the current node should become master
*
* @deprecated As of 2.0, because supporting inclusive language, replaced by {@link #newBecomeClusterManagerTask()}
*/
@Deprecated
Expand All @@ -384,8 +389,9 @@ public static Task newFinishElectionTask() {
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata
* will not be created with a newer version of opensearch as well as that all indices are newer or equal to the minimum index
* compatibility version.
* @see Version#minimumIndexCompatibilityVersion()
*
* @throws IllegalStateException if any index is incompatible with the given version
* @see Version#minimumIndexCompatibilityVersion()
*/
public static void ensureIndexCompatibility(final Version nodeVersion, Metadata metadata) {
Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion();
Expand Down Expand Up @@ -415,14 +421,18 @@ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata
}
}

/** ensures that the joining node has a version that's compatible with all current nodes*/
/**
* ensures that the joining node has a version that's compatible with all current nodes
*/
public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) {
final Version minNodeVersion = currentNodes.getMinNodeVersion();
final Version maxNodeVersion = currentNodes.getMaxNodeVersion();
ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion);
}

/** ensures that the joining node has a version that's compatible with a given version range */
/**
* ensures that the joining node has a version that's compatible with a given version range
*/
public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) {
assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion;
if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) {
Expand Down Expand Up @@ -466,13 +476,34 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version
}
}

public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
// We will let the node join the cluster if the current status is in FAILED state
if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue())
&& status.equals(DecommissionStatus.FAILED) == false) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
decommissionAttribute.toString(),
status.status()
);
}
}
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
validators.add((node, state) -> {
ensureNodesCompatibility(node.getVersion(), state.getNodes());
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
ensureNodeCommissioned(node, state.getMetadata());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.decommission;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Objects;

/**
* {@link DecommissionAttribute} encapsulates information about decommissioned node attribute like attribute name, attribute value.
*
* @opensearch.internal
*/
public final class DecommissionAttribute implements Writeable {
private final String attributeName;
private final String attributeValue;

/**
* Constructs new decommission attribute name value pair
*
* @param attributeName attribute name
* @param attributeValue attribute value
*/
public DecommissionAttribute(String attributeName, String attributeValue) {
this.attributeName = attributeName;
this.attributeValue = attributeValue;
}

/**
* Returns attribute name
*
* @return attributeName
*/
public String attributeName() {
return this.attributeName;
}

/**
* Returns attribute value
*
* @return attributeValue
*/
public String attributeValue() {
return this.attributeValue;
}

public DecommissionAttribute(StreamInput in) throws IOException {
attributeName = in.readString();
attributeValue = in.readString();
}

/**
* Writes decommission attribute name value to stream output
*
* @param out stream output
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(attributeName);
out.writeString(attributeValue);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

DecommissionAttribute that = (DecommissionAttribute) o;

if (!attributeName.equals(that.attributeName)) return false;
return attributeValue.equals(that.attributeValue);
}

@Override
public int hashCode() {
return Objects.hash(attributeName, attributeValue);
}

@Override
public String toString() {
return "DecommissionAttribute{" + "attributeName='" + attributeName + '\'' + ", attributeValue='" + attributeValue + '\'' + '}';
}
}
Loading

0 comments on commit 2ca6cfb

Please sign in to comment.