diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 16e75464caf0f..479de5db45a18 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -495,6 +495,8 @@ public void onFailure(Exception e) { private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) { final Optional optionalJoin = joinRequest.getOptionalJoin(); synchronized (mutex) { + updateMaxTermSeen(joinRequest.getTerm()); + final CoordinationState coordState = coordinationState.get(); final boolean prevElectionWon = coordState.electionWon(); @@ -1115,7 +1117,7 @@ private class CoordinatorPeerFinder extends PeerFinder { protected void onActiveMasterFound(DiscoveryNode masterNode, long term) { synchronized (mutex) { ensureTermAtLeast(masterNode, term); - joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term)); + joinHelper.sendJoinRequest(masterNode, getCurrentTerm(), joinWithDestination(lastJoin, masterNode, term)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 2e47acf07b215..03f7b37d2b835 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -130,7 +130,7 @@ public ClusterTasksResult execute(ClusterState currentSta StartJoinRequest::new, (request, channel, task) -> { final DiscoveryNode destination = request.getSourceNode(); - sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request))); + sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request))); channel.sendResponse(Empty.INSTANCE); }); @@ -230,9 +230,9 @@ void logLastFailedJoinAttempt() { } } - public void sendJoinRequest(DiscoveryNode destination, Optional optionalJoin) { + public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin) { assert destination.isMasterNode() : "trying to join master-ineligible " + destination; - final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin); + final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin); final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java index 091a6809c84dc..d74c3d7cf4174 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java @@ -18,29 +18,53 @@ */ package org.elasticsearch.cluster.coordination; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; +import java.util.Objects; import java.util.Optional; public class JoinRequest extends TransportRequest { + /** + * The sending (i.e. joining) node. + */ private final DiscoveryNode sourceNode; + /** + * The minimum term for which the joining node will accept any cluster state publications. If the joining node is in a strictly greater + * term than the master it wants to join then the master must enter a new term and hold another election. Doesn't necessarily match + * {@link JoinRequest#optionalJoin} and may be zero in join requests sent prior to {@link Version#V_8_0_0}. + */ + private final long minimumTerm; + + /** + * A vote for the receiving node. This vote is optional since the sending node may have voted for a different master in this term. + * That's ok, the sender likely discovered that the master we voted for lost the election and now we're trying to join the winner. Once + * the sender has successfully joined the master, the lack of a vote in its term causes another election (see + * {@link Publication#onMissingJoin(DiscoveryNode)}). + */ private final Optional optionalJoin; - public JoinRequest(DiscoveryNode sourceNode, Optional optionalJoin) { + public JoinRequest(DiscoveryNode sourceNode, long minimumTerm, Optional optionalJoin) { assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode); this.sourceNode = sourceNode; + this.minimumTerm = minimumTerm; this.optionalJoin = optionalJoin; } public JoinRequest(StreamInput in) throws IOException { super(in); sourceNode = new DiscoveryNode(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + minimumTerm = in.readLong(); + } else { + minimumTerm = 0L; + } optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new)); } @@ -48,6 +72,9 @@ public JoinRequest(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); sourceNode.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeLong(minimumTerm); + } out.writeOptionalWriteable(optionalJoin.orElse(null)); } @@ -55,6 +82,17 @@ public DiscoveryNode getSourceNode() { return sourceNode; } + public long getMinimumTerm() { + return minimumTerm; + } + + public long getTerm() { + // If the join is also present then its term will normally equal the corresponding term, but we do not require callers to + // obtain the term and the join in a synchronized fashion so it's possible that they disagree. Also older nodes do not share the + // minimum term, so for BWC we can take it from the join if present. + return Math.max(minimumTerm, optionalJoin.map(Join::getTerm).orElse(0L)); + } + public Optional getOptionalJoin() { return optionalJoin; } @@ -66,21 +104,21 @@ public boolean equals(Object o) { JoinRequest that = (JoinRequest) o; + if (minimumTerm != that.minimumTerm) return false; if (!sourceNode.equals(that.sourceNode)) return false; return optionalJoin.equals(that.optionalJoin); } @Override public int hashCode() { - int result = sourceNode.hashCode(); - result = 31 * result + optionalJoin.hashCode(); - return result; + return Objects.hash(sourceNode, minimumTerm, optionalJoin); } @Override public String toString() { return "JoinRequest{" + "sourceNode=" + sourceNode + + ", minimumTerm=" + minimumTerm + ", optionalJoin=" + optionalJoin + '}'; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index ef7567ea5df91..a800a7adf093b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -68,7 +68,7 @@ public void testJoinDeduplication() { // check that sending a join to node1 works Optional optionalJoin1 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests1.length, equalTo(1)); CapturedRequest capturedRequest1 = capturedRequests1[0]; @@ -79,14 +79,14 @@ public void testJoinDeduplication() { // check that sending a join to node2 works Optional optionalJoin2 = randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node2, optionalJoin2); + joinHelper.sendJoinRequest(node2, 0L, optionalJoin2); CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests2.length, equalTo(1)); CapturedRequest capturedRequest2 = capturedRequests2[0]; assertEquals(node2, capturedRequest2.node); // check that sending another join to node1 is a noop as the previous join is still in progress - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0)); // complete the previous join to node1 @@ -97,7 +97,7 @@ public void testJoinDeduplication() { } // check that sending another join to node1 now works again - joinHelper.sendJoinRequest(node1, optionalJoin1); + joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests1a.length, equalTo(1)); CapturedRequest capturedRequest1a = capturedRequests1a[0]; @@ -106,7 +106,7 @@ public void testJoinDeduplication() { // check that sending another join to node2 works if the optionalJoin is different Optional optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() : Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); - joinHelper.sendJoinRequest(node2, optionalJoin2a); + joinHelper.sendJoinRequest(node2, 0L, optionalJoin2a); CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear(); assertThat(capturedRequests2a.length, equalTo(1)); CapturedRequest capturedRequest2a = capturedRequests2a[0]; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java index cb89c7c0fea33..fb99649892ae2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java @@ -179,13 +179,18 @@ public void testJoinRequestEqualsHashCodeSerialization() { Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(), - randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); + randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin)); // Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest, (CopyFunction) joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new), joinRequest -> { if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) { - return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin()); + return new JoinRequest(createNode(randomAlphaOfLength(10)), + joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin()); + } else if (randomBoolean()) { + return new JoinRequest(joinRequest.getSourceNode(), + randomValueOtherThan(joinRequest.getMinimumTerm(), ESTestCase::randomNonNegativeLong), + joinRequest.getOptionalJoin()); } else { // change OptionalJoin final Optional newOptionalJoin; @@ -195,7 +200,7 @@ public void testJoinRequestEqualsHashCodeSerialization() { newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())); } - return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin); + return new JoinRequest(joinRequest.getSourceNode(), joinRequest.getMinimumTerm(), newOptionalJoin); } }); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 60575f7487676..ac3075597dc57 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -74,6 +74,7 @@ import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class NodeJoinTests extends ESTestCase { @@ -278,7 +279,8 @@ public void testJoinWithHigherTermElectsLeader() { assertFalse(isLocalNodeElectedMaster()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + SimpleFuture fut = joinNodeAsync(new JoinRequest(node1, newTerm, + Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); assertEquals(Coordinator.Mode.LEADER, coordinator.getMode()); assertNull(coordinator.getStateForMasterService().nodes().getMasterNodeId()); deterministicTaskQueue.runAllRunnableTasks(); @@ -298,7 +300,8 @@ public void testJoinWithHigherTermButBetterStateGetsRejected() { long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); expectThrows(CoordinationStateRejectedException.class, - () -> joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))))); + () -> joinNodeAndRun(new JoinRequest(node1, newTerm, + Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion))))); assertFalse(isLocalNodeElectedMaster()); } @@ -312,7 +315,7 @@ public void testJoinWithHigherTermButBetterStateStillElectsMasterThroughSelfJoin assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); long higherVersion = initialVersion + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))); + joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, higherVersion)))); assertTrue(isLocalNodeElectedMaster()); } @@ -325,14 +328,32 @@ public void testJoinElectedLeader() { new VotingConfiguration(Collections.singleton(node0.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertFalse(clusterStateHasNode(node1)); - joinNodeAndRun(new JoinRequest(node1, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertTrue(clusterStateHasNode(node1)); } + public void testJoinElectedLeaderWithHigherTerm() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, + new VotingConfiguration(Collections.singleton(node0.getId())))); + long newTerm = initialTerm + randomLongBetween(1, 10); + + joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + + long newerTerm = newTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node1, newerTerm, Optional.empty())); + assertThat(coordinator.getCurrentTerm(), greaterThanOrEqualTo(newerTerm)); + assertTrue(isLocalNodeElectedMaster()); + } + public void testJoinAccumulation() { DiscoveryNode node0 = newNode(0, true); DiscoveryNode node1 = newNode(1, true); @@ -343,17 +364,17 @@ public void testJoinAccumulation() { new VotingConfiguration(Collections.singleton(node2.getId())))); assertFalse(isLocalNodeElectedMaster()); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, Optional.of( + SimpleFuture futNode0 = joinNodeAsync(new JoinRequest(node0, newTerm, Optional.of( new Join(node0, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(futNode0.isDone()); assertFalse(isLocalNodeElectedMaster()); - SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, Optional.of( + SimpleFuture futNode1 = joinNodeAsync(new JoinRequest(node1, newTerm, Optional.of( new Join(node1, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(futNode1.isDone()); assertFalse(isLocalNodeElectedMaster()); - joinNodeAndRun(new JoinRequest(node2, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion)))); + joinNodeAndRun(new JoinRequest(node2, newTerm, Optional.of(new Join(node2, node0, newTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); assertTrue(clusterStateHasNode(node1)); assertTrue(clusterStateHasNode(node2)); @@ -372,7 +393,7 @@ public void testJoinFollowerWithHigherTerm() throws Exception { handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); long newerTerm = newTerm + randomLongBetween(1, 10); - joinNodeAndRun(new JoinRequest(node1, + joinNodeAndRun(new JoinRequest(node1, newerTerm, Optional.of(new Join(node1, node0, newerTerm, initialTerm, initialVersion)))); assertTrue(isLocalNodeElectedMaster()); } @@ -447,7 +468,7 @@ public void testJoinFollowerFails() throws Exception { handleStartJoinFrom(node1, newTerm); handleFollowerCheckFrom(node1, newTerm); assertThat(expectThrows(CoordinationStateRejectedException.class, - () -> joinNodeAndRun(new JoinRequest(node1, Optional.empty()))).getMessage(), + () -> joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.empty()))).getMessage(), containsString("join target is a follower")); assertFalse(isLocalNodeElectedMaster()); } @@ -460,7 +481,8 @@ public void testBecomeFollowerFailsPendingJoin() throws Exception { setupFakeMasterServiceAndCoordinator(initialTerm, initialState(node0, initialTerm, initialVersion, new VotingConfiguration(Collections.singleton(node1.getId())))); long newTerm = initialTerm + randomLongBetween(1, 10); - SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + SimpleFuture fut = joinNodeAsync(new JoinRequest(node0, newTerm, + Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); deterministicTaskQueue.runAllRunnableTasks(); assertFalse(fut.isDone()); assertFalse(isLocalNodeElectedMaster()); @@ -501,7 +523,7 @@ public void testConcurrentJoining() { logger.info("Successful voting nodes: {}", successfulNodes); List correctJoinRequests = successfulNodes.stream().map( - node -> new JoinRequest(node, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) + node -> new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion)))) .collect(Collectors.toList()); List possiblyUnsuccessfulNodes = new ArrayList<>(allNodes); @@ -512,15 +534,15 @@ public void testConcurrentJoining() { List possiblyFailingJoinRequests = possiblyUnsuccessfulNodes.stream().map(node -> { if (randomBoolean()) { // a correct request - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion))); } else if (randomBoolean()) { // term too low - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, randomLongBetween(0, initialTerm), initialTerm, initialVersion))); } else { // better state - return new JoinRequest(node, Optional.of(new Join(node, localNode, + return new JoinRequest(node, newTerm, Optional.of(new Join(node, localNode, newTerm, initialTerm, initialVersion + randomLongBetween(1, 10)))); } }).collect(Collectors.toList()); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java index e44805744aa5c..f58753c42c4da 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java @@ -105,7 +105,7 @@ public void testHandleNodeJoin_incompatibleClusterState() final CompletableFuture future = new CompletableFuture<>(); DiscoveryNode node = state.nodes().getLocalNode(); - coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, Optional.empty()), + coordinator.sendValidateJoinRequest(stateWithCustomMetaData, new JoinRequest(node, 0L, Optional.empty()), new JoinHelper.JoinCallback() { @Override public void onSuccess() {