Skip to content

Commit 9dcd88e

Browse files
authored
Allow joining node to trigger term bump (#53338)
In rare circumstances it is possible for an isolated node to have a greater term than the currently-elected leader. Today such a node will attempt to join the cluster but will not offer a vote to the leader and will reject its cluster state publications due to their stale term. This situation persists since there is no mechanism for the joining node to inform the leader that its term is stale and a new election is required. This commit adds the current term of the joining node to the join request. Once the join has been validated, the leader will perform another election to increase its term far enough to allow the isolated node to join properly. Fixes #53271
1 parent 377539e commit 9dcd88e

File tree

7 files changed

+99
-32
lines changed

7 files changed

+99
-32
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ public void onFailure(Exception e) {
495495
private void processJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
496496
final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
497497
synchronized (mutex) {
498+
updateMaxTermSeen(joinRequest.getTerm());
499+
498500
final CoordinationState coordState = coordinationState.get();
499501
final boolean prevElectionWon = coordState.electionWon();
500502

@@ -1115,7 +1117,7 @@ private class CoordinatorPeerFinder extends PeerFinder {
11151117
protected void onActiveMasterFound(DiscoveryNode masterNode, long term) {
11161118
synchronized (mutex) {
11171119
ensureTermAtLeast(masterNode, term);
1118-
joinHelper.sendJoinRequest(masterNode, joinWithDestination(lastJoin, masterNode, term));
1120+
joinHelper.sendJoinRequest(masterNode, getCurrentTerm(), joinWithDestination(lastJoin, masterNode, term));
11191121
}
11201122
}
11211123

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
130130
StartJoinRequest::new,
131131
(request, channel, task) -> {
132132
final DiscoveryNode destination = request.getSourceNode();
133-
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
133+
sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request)));
134134
channel.sendResponse(Empty.INSTANCE);
135135
});
136136

@@ -230,9 +230,9 @@ void logLastFailedJoinAttempt() {
230230
}
231231
}
232232

233-
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
233+
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
234234
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
235-
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
235+
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
236236
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
237237
if (pendingOutgoingJoins.add(dedupKey)) {
238238
logger.debug("attempting to join {} with {}", destination, joinRequest);

server/src/main/java/org/elasticsearch/cluster/coordination/JoinRequest.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,81 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.transport.TransportRequest;
2526

2627
import java.io.IOException;
28+
import java.util.Objects;
2729
import java.util.Optional;
2830

2931
public class JoinRequest extends TransportRequest {
3032

33+
/**
34+
* The sending (i.e. joining) node.
35+
*/
3136
private final DiscoveryNode sourceNode;
3237

38+
/**
39+
* The minimum term for which the joining node will accept any cluster state publications. If the joining node is in a strictly greater
40+
* term than the master it wants to join then the master must enter a new term and hold another election. Doesn't necessarily match
41+
* {@link JoinRequest#optionalJoin} and may be zero in join requests sent prior to {@link Version#V_8_0_0}.
42+
*/
43+
private final long minimumTerm;
44+
45+
/**
46+
* A vote for the receiving node. This vote is optional since the sending node may have voted for a different master in this term.
47+
* That's ok, the sender likely discovered that the master we voted for lost the election and now we're trying to join the winner. Once
48+
* the sender has successfully joined the master, the lack of a vote in its term causes another election (see
49+
* {@link Publication#onMissingJoin(DiscoveryNode)}).
50+
*/
3351
private final Optional<Join> optionalJoin;
3452

35-
public JoinRequest(DiscoveryNode sourceNode, Optional<Join> optionalJoin) {
53+
public JoinRequest(DiscoveryNode sourceNode, long minimumTerm, Optional<Join> optionalJoin) {
3654
assert optionalJoin.isPresent() == false || optionalJoin.get().getSourceNode().equals(sourceNode);
3755
this.sourceNode = sourceNode;
56+
this.minimumTerm = minimumTerm;
3857
this.optionalJoin = optionalJoin;
3958
}
4059

4160
public JoinRequest(StreamInput in) throws IOException {
4261
super(in);
4362
sourceNode = new DiscoveryNode(in);
63+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
64+
minimumTerm = in.readLong();
65+
} else {
66+
minimumTerm = 0L;
67+
}
4468
optionalJoin = Optional.ofNullable(in.readOptionalWriteable(Join::new));
4569
}
4670

4771
@Override
4872
public void writeTo(StreamOutput out) throws IOException {
4973
super.writeTo(out);
5074
sourceNode.writeTo(out);
75+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
76+
out.writeLong(minimumTerm);
77+
}
5178
out.writeOptionalWriteable(optionalJoin.orElse(null));
5279
}
5380

5481
public DiscoveryNode getSourceNode() {
5582
return sourceNode;
5683
}
5784

85+
public long getMinimumTerm() {
86+
return minimumTerm;
87+
}
88+
89+
public long getTerm() {
90+
// If the join is also present then its term will normally equal the corresponding term, but we do not require callers to
91+
// obtain the term and the join in a synchronized fashion so it's possible that they disagree. Also older nodes do not share the
92+
// minimum term, so for BWC we can take it from the join if present.
93+
return Math.max(minimumTerm, optionalJoin.map(Join::getTerm).orElse(0L));
94+
}
95+
5896
public Optional<Join> getOptionalJoin() {
5997
return optionalJoin;
6098
}
@@ -66,21 +104,21 @@ public boolean equals(Object o) {
66104

67105
JoinRequest that = (JoinRequest) o;
68106

107+
if (minimumTerm != that.minimumTerm) return false;
69108
if (!sourceNode.equals(that.sourceNode)) return false;
70109
return optionalJoin.equals(that.optionalJoin);
71110
}
72111

73112
@Override
74113
public int hashCode() {
75-
int result = sourceNode.hashCode();
76-
result = 31 * result + optionalJoin.hashCode();
77-
return result;
114+
return Objects.hash(sourceNode, minimumTerm, optionalJoin);
78115
}
79116

80117
@Override
81118
public String toString() {
82119
return "JoinRequest{" +
83120
"sourceNode=" + sourceNode +
121+
", minimumTerm=" + minimumTerm +
84122
", optionalJoin=" + optionalJoin +
85123
'}';
86124
}

server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testJoinDeduplication() {
6868
// check that sending a join to node1 works
6969
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
7070
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
71-
joinHelper.sendJoinRequest(node1, optionalJoin1);
71+
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
7272
CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear();
7373
assertThat(capturedRequests1.length, equalTo(1));
7474
CapturedRequest capturedRequest1 = capturedRequests1[0];
@@ -79,14 +79,14 @@ public void testJoinDeduplication() {
7979
// check that sending a join to node2 works
8080
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
8181
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
82-
joinHelper.sendJoinRequest(node2, optionalJoin2);
82+
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2);
8383
CapturedRequest[] capturedRequests2 = capturingTransport.getCapturedRequestsAndClear();
8484
assertThat(capturedRequests2.length, equalTo(1));
8585
CapturedRequest capturedRequest2 = capturedRequests2[0];
8686
assertEquals(node2, capturedRequest2.node);
8787

8888
// check that sending another join to node1 is a noop as the previous join is still in progress
89-
joinHelper.sendJoinRequest(node1, optionalJoin1);
89+
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
9090
assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0));
9191

9292
// complete the previous join to node1
@@ -97,7 +97,7 @@ public void testJoinDeduplication() {
9797
}
9898

9999
// check that sending another join to node1 now works again
100-
joinHelper.sendJoinRequest(node1, optionalJoin1);
100+
joinHelper.sendJoinRequest(node1, 0L, optionalJoin1);
101101
CapturedRequest[] capturedRequests1a = capturingTransport.getCapturedRequestsAndClear();
102102
assertThat(capturedRequests1a.length, equalTo(1));
103103
CapturedRequest capturedRequest1a = capturedRequests1a[0];
@@ -106,7 +106,7 @@ public void testJoinDeduplication() {
106106
// check that sending another join to node2 works if the optionalJoin is different
107107
Optional<Join> optionalJoin2a = optionalJoin2.isPresent() && randomBoolean() ? Optional.empty() :
108108
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
109-
joinHelper.sendJoinRequest(node2, optionalJoin2a);
109+
joinHelper.sendJoinRequest(node2, 0L, optionalJoin2a);
110110
CapturedRequest[] capturedRequests2a = capturingTransport.getCapturedRequestsAndClear();
111111
assertThat(capturedRequests2a.length, equalTo(1));
112112
CapturedRequest capturedRequest2a = capturedRequests2a[0];

server/src/test/java/org/elasticsearch/cluster/coordination/MessagesTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,18 @@ public void testJoinRequestEqualsHashCodeSerialization() {
179179
Join initialJoin = new Join(createNode(randomAlphaOfLength(10)), createNode(randomAlphaOfLength(10)), randomNonNegativeLong(),
180180
randomNonNegativeLong(), randomNonNegativeLong());
181181
JoinRequest initialJoinRequest = new JoinRequest(initialJoin.getSourceNode(),
182-
randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
182+
randomNonNegativeLong(), randomBoolean() ? Optional.empty() : Optional.of(initialJoin));
183183
// Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type
184184
EqualsHashCodeTestUtils.checkEqualsAndHashCode(initialJoinRequest,
185185
(CopyFunction<JoinRequest>) joinRequest -> copyWriteable(joinRequest, writableRegistry(), JoinRequest::new),
186186
joinRequest -> {
187187
if (randomBoolean() && joinRequest.getOptionalJoin().isPresent() == false) {
188-
return new JoinRequest(createNode(randomAlphaOfLength(20)), joinRequest.getOptionalJoin());
188+
return new JoinRequest(createNode(randomAlphaOfLength(10)),
189+
joinRequest.getMinimumTerm(), joinRequest.getOptionalJoin());
190+
} else if (randomBoolean()) {
191+
return new JoinRequest(joinRequest.getSourceNode(),
192+
randomValueOtherThan(joinRequest.getMinimumTerm(), ESTestCase::randomNonNegativeLong),
193+
joinRequest.getOptionalJoin());
189194
} else {
190195
// change OptionalJoin
191196
final Optional<Join> newOptionalJoin;
@@ -195,7 +200,7 @@ public void testJoinRequestEqualsHashCodeSerialization() {
195200
newOptionalJoin = Optional.of(new Join(joinRequest.getSourceNode(), createNode(randomAlphaOfLength(10)),
196201
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
197202
}
198-
return new JoinRequest(joinRequest.getSourceNode(), newOptionalJoin);
203+
return new JoinRequest(joinRequest.getSourceNode(), joinRequest.getMinimumTerm(), newOptionalJoin);
199204
}
200205
});
201206
}

0 commit comments

Comments
 (0)