Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling discovery for decommissioned nodes #4590

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a015a81
Controlling discovery for decommissioned nodes
imRishN Sep 26, 2022
0a3f262
Fix spotless check
imRishN Sep 26, 2022
24fcaaf
Add changelog
imRishN Sep 26, 2022
c9e0f5b
Empty-Commit
imRishN Sep 26, 2022
bb0547d
Add basic UT
imRishN Sep 26, 2022
2ea4dc9
Add Consumer instead of listener
imRishN Sep 28, 2022
2582c4b
Remove UT
imRishN Sep 28, 2022
70b7d48
Refactor
imRishN Sep 28, 2022
a30b83d
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Sep 28, 2022
da2fa40
Fix spotless check
imRishN Sep 28, 2022
61bb893
Improve logging msg
imRishN Sep 28, 2022
d872065
Fix spotless check
imRishN Sep 28, 2022
bee48b4
Add log msg in join helper
imRishN Sep 30, 2022
d02f9ae
Update peer finder interval to 2 min during decommission
imRishN Sep 30, 2022
6c0ce6c
Move flag to coordinator
imRishN Sep 30, 2022
01f2d70
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Sep 30, 2022
8f82360
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 2, 2022
1c497b7
Add UT
imRishN Oct 3, 2022
e4d1354
Fix spotless check
imRishN Oct 3, 2022
1ac690d
Prevent join at join execute task and at coordinator. Add UTs
imRishN Oct 4, 2022
f448d78
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 4, 2022
adc4a8c
Move validator appropriately
imRishN Oct 5, 2022
69f2784
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 5, 2022
c42586b
Fix spotless check
imRishN Oct 5, 2022
a53e578
Make method pkg private
imRishN Oct 5, 2022
edff382
Ensure decommissioned node don't become leader
imRishN Oct 5, 2022
46c49b0
Add static helper for decommission flow
imRishN Oct 6, 2022
63f6c00
Updates in pre voting round
imRishN Oct 6, 2022
b8cf3fe
Move commission check
imRishN Oct 6, 2022
6a4c16f
Move commission check
imRishN Oct 6, 2022
50a6e67
Move helpers to Service
imRishN Oct 6, 2022
c9db6b1
Fix executor
imRishN Oct 6, 2022
bb6f573
Remove UT
imRishN Oct 6, 2022
8dd4457
Fix spotless check
imRishN Oct 6, 2022
19c524d
Minor
imRishN Oct 6, 2022
c5b1c0d
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 6, 2022
41cc099
Add built in join validator
imRishN Oct 6, 2022
260641a
Fix
imRishN Oct 6, 2022
8e161a6
Add UT for join
imRishN Oct 7, 2022
4a932b7
Merge remote-tracking branch 'upstream/main' into decommission/contro…
imRishN Oct 7, 2022
05d073e
Fix spotless check
imRishN Oct 7, 2022
3b6c1b7
Changes in coordinator
imRishN Oct 7, 2022
8b94b8d
Add UT for coordinator
imRishN Oct 7, 2022
165b961
Fix spotless check
imRishN Oct 7, 2022
e4e1b5c
Add test for execute method of task
imRishN Oct 7, 2022
0b3f106
Empty-Commit
imRishN Oct 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
- Improve Gradle pre-commit checks to pre-empt Jenkins build ([#4660](https://github.com/opensearch-project/OpenSearch/pull/4660))
- Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))
- Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand Down Expand Up @@ -139,6 +140,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery

private final Settings settings;
private final boolean singleNodeDiscovery;
private volatile boolean localNodeCommissioned;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final ClusterManagerService clusterManagerService;
Expand Down Expand Up @@ -219,7 +221,8 @@ public Coordinator(
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService
nodeHealthService,
this::onNodeCommissionStatusChange
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
Expand Down Expand Up @@ -282,6 +285,7 @@ public Coordinator(
joinHelper::logLastFailedJoinAttempt
);
this.nodeHealthService = nodeHealthService;
this.localNodeCommissioned = true;
}

private ClusterFormationState getClusterFormationState() {
Expand Down Expand Up @@ -597,6 +601,9 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
joinRequest.getSourceNode().getVersion(),
stateForJoinValidation.getNodes().getMinNodeVersion()
);
// we are checking source node commission status here to reject any join request coming from a decommissioned node
// even before executing the join task to fail fast
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
imRishN marked this conversation as resolved.
Show resolved Hide resolved
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
Expand Down Expand Up @@ -1425,6 +1432,17 @@ protected void onFoundPeersUpdated() {
}
}

// package-visible for testing
synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
this.localNodeCommissioned = localNodeCommissioned;
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
return localNodeCommissioned;
}

private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;

Expand All @@ -1451,6 +1469,14 @@ public void run() {
return;
}

// if either the localNodeCommissioned flag or the last accepted state thinks it should skip pre voting, we will
// acknowledge it
if (nodeCommissioned(lastAcceptedState.nodes().getLocalNode(), lastAcceptedState.metadata()) == false
|| localNodeCommissioned == false) {
logger.debug("skip prevoting as local node is decommissioned");
return;
}

if (prevotingRound != null) {
prevotingRound.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
Expand All @@ -78,6 +80,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -118,6 +121,7 @@ public class JoinHelper {
private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;

JoinHelper(
Settings settings,
Expand All @@ -130,12 +134,14 @@ public class JoinHelper {
Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
Consumer<Boolean> nodeCommissioned
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissioned = nodeCommissioned;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -342,6 +348,7 @@ public void handleResponse(Empty response) {
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
nodeCommissioned.accept(true);
imRishN marked this conversation as resolved.
Show resolved Hide resolved
onCompletion.run();
}

Expand All @@ -352,6 +359,13 @@ public void handleException(TransportException exp) {
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.logNow();
lastFailedJoinAttempt.set(attempt);
if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current flow is:

  1. Node sends the JoinRequest to active leader
  2. Active leader sends the ValidateJoinRequest
  3. Node checks for decommissioning and will fail the request if node has decommissioning attribute

The above logic will not work if the current master eligible node is not active leader and is a candidate accumulating joins Code Ref

 if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
    .....
   sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
 } else {
    processJoinRequest(joinRequest, joinCallback);
   }

Suggestion:
Ideally the decommissioning check should be executed at

  1. During handleJoinRequest in the Coordinator. This is lightweight check DiscoveryNode would be sending attribute information during join. This will reject any joins from decommissioned node early on itself.
  2. During JoinExecutor submittask as well as that will ensure correctness. It will never allow a node with decommission attribute to join the cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can you please help me understand that when current leader eligible node is not active leader and is just accumulating joins, we just skip the validator code flow?

Copy link
Member Author

@imRishN imRishN Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above logic will not work if the current master eligible node is not active leader and is a candidate accumulating joins Code Ref

Thanks for pointing this out @shwetathareja. As per your suggestions, implementing the changes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made these changes.

  1. A check in Coordinator#handleJoinRequest (when the state doesn't have STATE_NOT_RECOVERED_BLOCK)
  2. Check in the execute method of JoinTask
  3. Removed it from builtin validators

@shwetathareja , lmk if this looks ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I explored more on join validators -

During handleJoinRequest in the Coordinator. This is lightweight check DiscoveryNode would be sending attribute information during join. This will reject any joins from decommissioned node early on itself.

I think we can keep it as built in join validators, and Coordinator#handleJoinRequest validates all the checks here. So handle join request would be covered here. Another case I see where keeping it at BuiltinValidator would help is during handlePublishRequest. Ideally we would never hit this case as it runs only on Mode.Leader but I think for any such cases in future, it would be good to have all the validators at one place

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built validator delays the check which can be checked early on during handleJoinRequest itself. Can you help me understand the handlePublishRequest scenario?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
            final ClusterState stateForJoinValidation = getStateForClusterManagerService();

            if (stateForJoinValidation.nodes().isLocalNodeElectedClusterManager()) {
                onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
                if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
                    // we do this in a couple of places including the cluster update thread. This one here is really just best effort
                    // to ensure we fail as fast as possible.
                    JoinTaskExecutor.ensureMajorVersionBarrier(
                        joinRequest.getSourceNode().getVersion(),
                        stateForJoinValidation.getNodes().getMinNodeVersion()
                    );
                    // we are checking source node commission status here to reject any join request coming from a decommissioned node
                    // even before executing the join task to fail fast
                    JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
                }
                sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
            } else {
                processJoinRequest(joinRequest, joinCallback);
            }
        }, joinCallback::onFailure));

This is the snippet from handleJoinRequest of Coordinator. If you see, this method is also running the same built in validators onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));. And hence the concern with early checking is actually already resolved here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might never actually hit the handlePublishRequest scenario as this would be run only for Leader mode. But what I was trying to say is plugins put their join validators in onJoinValidators and then Coordinator adds all other validators to this onJoinValidators. And then in handlePublishRequest and handleJoinRequest we validate with whatever validators onJoinValidators has. Hence, keeping all the validators centrally in onJoinValidators might be good

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlePublishRequest is executed on the node joining the master. But with the check inside handleJoinRequest master will reject the joins upfront.

Copy link
Member Author

@imRishN imRishN Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I meant the built in validators are also executed in handleJoinRequest

logger.info(
"local node is decommissioned [{}]. Will not be able to join the cluster",
exp.getCause().getMessage()
);
nodeCommissioned.accept(false);
}
onCompletion.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -64,6 +61,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

/**
Expand Down Expand Up @@ -196,14 +194,17 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getVersion(), currentState.getMetadata());
// we have added the same check in handleJoinRequest method and adding it here as this method
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for these were added as part of #4084. Adding it below again for ref -

testJoinClusterWithNoDecommission
testPreventJoinClusterWithDecommission
testJoinClusterWithDifferentDecommission
testJoinClusterWithDecommissionFailed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add test in JoinTaskExecutorTests class to ensure this exist in the executor method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above mentioned tests are in JoinTaskExecutorTests. Added one more to test validator in execute method

nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
if (node.isClusterManagerNode()) {
joiniedNodeNameIds.put(node.getName(), node.getId());
}
} catch (IllegalArgumentException | IllegalStateException e) {
} catch (IllegalArgumentException | IllegalStateException | NodeDecommissionedException e) {
results.failure(joinTask, e);
continue;
}
Expand Down Expand Up @@ -477,22 +478,13 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version
}

public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
// We will let the node join the cluster if the current status is in FAILED state
if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue())
&& (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
decommissionAttribute.toString(),
status.status()
);
}
}
if (nodeCommissioned(node, metadata) == false) {
throw new NodeDecommissionedException(
"node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]",
node.toString(),
metadata.decommissionAttributeMetadata().decommissionAttribute().toString(),
metadata.decommissionAttributeMetadata().status().status()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,6 @@ private Set<DiscoveryNode> filterNodesWithDecommissionAttribute(
return nodesWithDecommissionAttribute;
}

private static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) {
return discoveryNode.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue());
}

private static void validateAwarenessAttribute(
final DecommissionAttribute decommissionAttribute,
List<String> awarenessAttributes,
Expand Down Expand Up @@ -531,4 +527,38 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});
}

/**
* Utility method to check if the node has decommissioned attribute
*
* @param discoveryNode node to check on
* @param decommissionAttribute attribute to be checked with
* @return true or false based on whether node has decommissioned attribute
*/
public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) {
String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName());
return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue());
}

/**
* Utility method to check if the node is commissioned or not
*
* @param discoveryNode node to check on
* @param metadata metadata present current which will be used to check the commissioning status of the node
* @return if the node is commissioned or not
*/
public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) {
DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata();
if (decommissionAttributeMetadata != null) {
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();
DecommissionStatus status = decommissionAttributeMetadata.status();
if (decommissionAttribute != null && status != null) {
if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute)
&& (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) {
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
Expand Down
29 changes: 28 additions & 1 deletion server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,23 @@ public abstract class PeerFinder {
Setting.Property.NodeScope
);

// the time between attempts to find all peers when node is in decommissioned state, default set to 2 minutes
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting(
"discovery.find_peers_interval_during_decommission",
TimeValue.timeValueSeconds(120L),
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting(
"discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
);

private final TimeValue findPeersInterval;
private final Settings settings;
private TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;

private final Object mutex = new Object();
Expand All @@ -112,6 +121,7 @@ public PeerFinder(
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
Expand All @@ -128,6 +138,23 @@ public PeerFinder(
);
}

public synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
findPeersInterval = localNodeCommissioned
? DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings)
: DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
logger.info(
"setting findPeersInterval to [{}] as node commission status = [{}] for local node [{}]",
findPeersInterval,
localNodeCommissioned,
transportService.getLocalNode()
);
}

// package private for tests
TimeValue getFindPeersInterval() {
return findPeersInterval;
}

public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

Expand Down
Loading