3434import org .elasticsearch .cluster .routing .RerouteService ;
3535import org .elasticsearch .cluster .routing .allocation .AllocationService ;
3636import org .elasticsearch .cluster .service .MasterService ;
37- import org .elasticsearch .common .Nullable ;
3837import org .elasticsearch .common .Priority ;
3938import org .elasticsearch .common .collect .Tuple ;
4039import org .elasticsearch .common .io .stream .StreamInput ;
4342import org .elasticsearch .common .unit .TimeValue ;
4443import org .elasticsearch .discovery .zen .MembershipAction ;
4544import org .elasticsearch .discovery .zen .ZenDiscovery ;
46- import org .elasticsearch .discovery .DiscoveryModule ;
4745import org .elasticsearch .monitor .NodeHealthService ;
4846import org .elasticsearch .monitor .StatusInfo ;
4947import org .elasticsearch .threadpool .ThreadPool ;
@@ -83,22 +81,21 @@ public class JoinHelper {
8381 public static final String VALIDATE_JOIN_ACTION_NAME = "internal:cluster/coordination/join/validate" ;
8482 public static final String START_JOIN_ACTION_NAME = "internal:cluster/coordination/start_join" ;
8583
86- // the timeout for each join attempt
84+ // the timeout for Zen1 join attempts
8785 public static final Setting <TimeValue > JOIN_TIMEOUT_SETTING =
8886 Setting .timeSetting ("cluster.join.timeout" ,
89- TimeValue .timeValueMillis (60000 ), TimeValue .timeValueMillis (1 ), Setting .Property .NodeScope );
87+ TimeValue .timeValueMillis (60000 ), TimeValue .timeValueMillis (1 ), Setting .Property .NodeScope , Setting . Property . Deprecated );
9088
9189 private final MasterService masterService ;
9290 private final TransportService transportService ;
9391 private final JoinTaskExecutor joinTaskExecutor ;
9492
95- @ Nullable // if using single-node discovery
96- private final TimeValue joinTimeout ;
93+ private final TimeValue joinTimeout ; // only used for Zen1 joining
9794 private final NodeHealthService nodeHealthService ;
9895
9996 private final Set <Tuple <DiscoveryNode , JoinRequest >> pendingOutgoingJoins = Collections .synchronizedSet (new HashSet <>());
10097
101- private AtomicReference <FailedJoinAttempt > lastFailedJoinAttempt = new AtomicReference <>();
98+ private final AtomicReference <FailedJoinAttempt > lastFailedJoinAttempt = new AtomicReference <>();
10299
103100 JoinHelper (Settings settings , AllocationService allocationService , MasterService masterService ,
104101 TransportService transportService , LongSupplier currentTermSupplier , Supplier <ClusterState > currentStateSupplier ,
@@ -108,7 +105,7 @@ public class JoinHelper {
108105 this .masterService = masterService ;
109106 this .transportService = transportService ;
110107 this .nodeHealthService = nodeHealthService ;
111- this .joinTimeout = DiscoveryModule . isSingleNodeDiscovery ( settings ) ? null : JOIN_TIMEOUT_SETTING .get (settings );
108+ this .joinTimeout = JOIN_TIMEOUT_SETTING .get (settings );
112109 this .joinTaskExecutor = new JoinTaskExecutor (settings , allocationService , logger , rerouteService ) {
113110
114111 @ Override
@@ -286,15 +283,17 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join>
286283 logger .debug ("attempting to join {} with {}" , destination , joinRequest );
287284 final String actionName ;
288285 final TransportRequest transportRequest ;
286+ final TransportRequestOptions transportRequestOptions ;
289287 if (Coordinator .isZen1Node (destination )) {
290288 actionName = MembershipAction .DISCOVERY_JOIN_ACTION_NAME ;
291289 transportRequest = new MembershipAction .JoinRequest (transportService .getLocalNode ());
290+ transportRequestOptions = TransportRequestOptions .builder ().withTimeout (joinTimeout ).build ();
292291 } else {
293292 actionName = JOIN_ACTION_NAME ;
294293 transportRequest = joinRequest ;
294+ transportRequestOptions = TransportRequestOptions .EMPTY ;
295295 }
296- transportService .sendRequest (destination , actionName , transportRequest ,
297- TransportRequestOptions .builder ().withTimeout (joinTimeout ).build (),
296+ transportService .sendRequest (destination , actionName , transportRequest , transportRequestOptions ,
298297 new TransportResponseHandler <Empty >() {
299298 @ Override
300299 public Empty read (StreamInput in ) {
@@ -363,9 +362,7 @@ public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, Acti
363362 } else {
364363 actionName = VALIDATE_JOIN_ACTION_NAME ;
365364 }
366- transportService .sendRequest (node , actionName ,
367- new ValidateJoinRequest (state ),
368- TransportRequestOptions .builder ().withTimeout (joinTimeout ).build (),
365+ transportService .sendRequest (node , actionName , new ValidateJoinRequest (state ),
369366 new ActionListenerResponseHandler <>(listener , i -> Empty .INSTANCE , ThreadPool .Names .GENERIC ));
370367 }
371368
0 commit comments