diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6d86bb613be43..585987b31d771 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -164,7 +164,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry, this::handlePublishRequest, this::handleApplyCommit); - this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); + this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; @@ -183,20 +183,14 @@ private ClusterFormationState getClusterFormationState() { StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm()); } - private Runnable getOnLeaderFailure() { - return new Runnable() { - @Override - public void run() { - synchronized (mutex) { - becomeCandidate("onLeaderFailure"); - } - } - - @Override - public String toString() { - return "notification of leader failure"; + private void onLeaderFailure(Exception e) { + synchronized (mutex) { + if (mode != Mode.CANDIDATE) { + assert lastKnownLeader.isPresent(); + logger.info(new ParameterizedMessage("master node [{}] failed, restarting discovery", lastKnownLeader.get()), e); } - }; + becomeCandidate("onLeaderFailure"); + } } private void removeNode(DiscoveryNode discoveryNode, String reason) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index 5bc5ea866ee83..703c08bf260c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; @@ -33,6 +34,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -48,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * The LeaderChecker is responsible for allowing followers to check that the currently elected leader is still connected and healthy. We are @@ -75,20 +78,17 @@ public class LeaderChecker { public static final Setting LEADER_CHECK_RETRY_COUNT_SETTING = Setting.intSetting("cluster.fault_detection.leader_check.retry_count", 3, 1, Setting.Property.NodeScope); - private final Settings settings; - private final TimeValue leaderCheckInterval; private final TimeValue leaderCheckTimeout; private final int leaderCheckRetryCount; private final TransportService transportService; - private final Runnable onLeaderFailure; + private final Consumer onLeaderFailure; private AtomicReference currentChecker = new AtomicReference<>(); private volatile DiscoveryNodes discoveryNodes; - public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) { - this.settings = settings; + public LeaderChecker(final Settings settings, final TransportService transportService, final Consumer onLeaderFailure) { leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings); leaderCheckTimeout = LEADER_CHECK_TIMEOUT_SETTING.get(settings); leaderCheckRetryCount = LEADER_CHECK_RETRY_COUNT_SETTING.get(settings); @@ -234,16 +234,19 @@ public void handleException(TransportException exp) { } if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { - logger.debug(new ParameterizedMessage("leader [{}] disconnected, failing immediately", leader), exp); - leaderFailed(); + logger.debug(new ParameterizedMessage( + "leader [{}] disconnected during check", leader), exp); + leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp)); return; } long failureCount = failureCountSinceLastSuccess.incrementAndGet(); if (failureCount >= leaderCheckRetryCount) { - logger.debug(new ParameterizedMessage("{} consecutive failures (limit [{}] is {}) so leader [{}] has failed", - failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount, leader), exp); - leaderFailed(); + logger.debug(new ParameterizedMessage( + "leader [{}] has failed {} consecutive checks (limit [{}] is {}); last failure was:", + leader, failureCount, LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), leaderCheckRetryCount), exp); + leaderFailed(new ElasticsearchException( + "node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp)); return; } @@ -259,9 +262,19 @@ public String executor() { }); } - void leaderFailed() { + void leaderFailed(Exception e) { if (isClosed.compareAndSet(false, true)) { - transportService.getThreadPool().generic().execute(onLeaderFailure); + transportService.getThreadPool().generic().execute(new Runnable() { + @Override + public void run() { + onLeaderFailure.accept(e); + } + + @Override + public String toString() { + return "notification of leader failure: " + e.getMessage(); + } + }); } else { logger.trace("already closed, not failing leader"); } @@ -269,7 +282,8 @@ void leaderFailed() { void handleDisconnectedNode(DiscoveryNode discoveryNode) { if (discoveryNode.equals(leader)) { - leaderFailed(); + logger.debug("leader [{}] disconnected", leader); + leaderFailed(new NodeDisconnectedException(discoveryNode, "disconnected")); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index 7f2512f97f87b..4e90ae02e12ac 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -89,6 +90,13 @@ protected void doRun() throws Exception { remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis()); // success means (amongst other things) that the cluster names match logger.trace("[{}] handshake successful: {}", this, remoteNode); + } catch (Exception e) { + // we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an + // Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations + // (e.g. cluster name) that the user should address + logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e); + listener.onFailure(e); + return; } finally { IOUtils.closeWhileHandlingException(connection); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java index a806cb84a6818..ce25d24bce6ba 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java @@ -52,9 +52,12 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.matchesRegex; import static org.hamcrest.Matchers.nullValue; public class LeaderCheckerTests extends ESTestCase { @@ -146,7 +149,10 @@ public String toString() { final AtomicBoolean leaderFailed = new AtomicBoolean(); final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, - () -> assertTrue(leaderFailed.compareAndSet(false, true))); + e -> { + assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks")); + assertTrue(leaderFailed.compareAndSet(false, true)); + }); logger.info("--> creating first checker"); leaderChecker.updateLeader(leader1); @@ -247,7 +253,10 @@ public String toString() { final AtomicBoolean leaderFailed = new AtomicBoolean(); final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, - () -> assertTrue(leaderFailed.compareAndSet(false, true))); + e -> { + assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check"))); + assertTrue(leaderFailed.compareAndSet(false, true)); + }); leaderChecker.updateLeader(leader); { @@ -316,7 +325,7 @@ public void testLeaderBehaviour() { transportService.start(); transportService.acceptIncomingRequests(); - final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, () -> fail("shouldn't be checking anything")); + final LeaderChecker leaderChecker = new LeaderChecker(settings, transportService, e -> fail("shouldn't be checking anything")); final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();