Skip to content

Commit 0e9186e

Browse files
authored
Simplify Unicast Zen Ping (elastic#22277)
The `UnicastZenPing` shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: 1) Clear 3 round flow - no interleaving of scheduling. 2) The previous implementation did a best effort attempt to wait for ongoing pings to be sent and completed. The pings were guaranteed to complete because each used the total ping duration as a timeout. This did make it hard to reason about the total ping duration and the flow of the code. All of this is removed now and ping should just complete within the given duration or not be counted (note that it was very handy for testing, but I move the needed sync logic to the test). 3) Because of (2) the pinging scheduling changed a bit, to give a chance for the last round to complete. We now ping at the beginning, 1/3 and 2/3 of the duration. 4) To offset for (3) a bit, incoming ping requests are now added to on going ping collections. 5) UnicastZenPing never establishes full blown connections (but does reuse them if there). Relates to elastic#22120 6) Discovery host providers are only used once per pinging round. Closes elastic#21739 7) Usage of the ability to open a connection without connecting to a node ( elastic#22194 ) and shorter connection timeouts helps with connections piling up. Closes elastic#19370 8) Beefed up testing and sped them up. 9) removed light profile from production code
1 parent 567c65b commit 0e9186e

File tree

17 files changed

+715
-454
lines changed

17 files changed

+715
-454
lines changed

core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,21 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
101101

102102
private final TransportClient.HostFailureListener hostFailureListener;
103103

104+
// TODO: migrate this to use low level connections and single type channels
105+
/** {@link ConnectionProfile} to use when to connecting to the listed nodes and doing a liveness check */
106+
private static final ConnectionProfile LISTED_NODES_PROFILE;
107+
108+
static {
109+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
110+
builder.addConnections(1,
111+
TransportRequestOptions.Type.BULK,
112+
TransportRequestOptions.Type.PING,
113+
TransportRequestOptions.Type.RECOVERY,
114+
TransportRequestOptions.Type.REG,
115+
TransportRequestOptions.Type.STATE);
116+
LISTED_NODES_PROFILE = builder.build();
117+
}
118+
104119
TransportClientNodesService(Settings settings, TransportService transportService,
105120
ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
106121
super(settings);
@@ -389,8 +404,8 @@ protected void doSample() {
389404
if (!transportService.nodeConnected(listedNode)) {
390405
try {
391406
// its a listed node, light connect to it...
392-
logger.trace("connecting to listed node (light) [{}]", listedNode);
393-
transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
407+
logger.trace("connecting to listed node [{}]", listedNode);
408+
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
394409
} catch (Exception e) {
395410
logger.info(
396411
(Supplier<?>)
@@ -470,7 +485,7 @@ public void run() {
470485
} else {
471486
// its a listed node, light connect to it...
472487
logger.trace("connecting to listed node (light) [{}]", listedNode);
473-
transportService.connectToNode(listedNode, ConnectionProfile.LIGHT_PROFILE);
488+
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
474489
}
475490
} catch (Exception e) {
476491
logger.debug(

core/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.node.DiscoveryNode;
2626
import org.elasticsearch.common.component.AbstractComponent;
27-
import org.elasticsearch.common.inject.Inject;
2827
import org.elasticsearch.common.settings.Setting;
2928
import org.elasticsearch.common.settings.Setting.Property;
3029
import org.elasticsearch.common.settings.Settings;
@@ -174,7 +173,7 @@ public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, Clust
174173
* Returns the given nodes sorted by likelihood of being elected as master, most likely first.
175174
* Non-master nodes are not removed but are rather put in the end
176175
*/
177-
public static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
176+
static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
178177
ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
179178
CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
180179
return sortedNodes;

core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

Lines changed: 261 additions & 261 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@
6767
import java.util.ArrayList;
6868
import java.util.List;
6969
import java.util.Set;
70-
import java.util.concurrent.CountDownLatch;
70+
import java.util.concurrent.CompletableFuture;
71+
import java.util.concurrent.ExecutionException;
7172
import java.util.concurrent.atomic.AtomicBoolean;
7273
import java.util.concurrent.atomic.AtomicInteger;
7374
import java.util.concurrent.atomic.AtomicReference;
74-
import java.util.function.BiFunction;
7575
import java.util.function.Consumer;
7676
import java.util.stream.Collectors;
7777

@@ -1021,24 +1021,22 @@ public void handleException(TransportException exp) {
10211021
}
10221022

10231023
private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
1024-
final ZenPing.PingCollection response = new ZenPing.PingCollection();
1025-
final CountDownLatch latch = new CountDownLatch(1);
1024+
final CompletableFuture<ZenPing.PingCollection> response = new CompletableFuture<>();
10261025
try {
1027-
zenPing.ping(pings -> {
1028-
response.addPings(pings);
1029-
latch.countDown();
1030-
}, timeout);
1026+
zenPing.ping(response::complete, timeout);
10311027
} catch (Exception ex) {
1032-
logger.warn("Ping execution failed", ex);
1033-
latch.countDown();
1028+
// logged later
1029+
response.completeExceptionally(ex);
10341030
}
10351031

10361032
try {
1037-
latch.await();
1038-
return response;
1033+
return response.get();
10391034
} catch (InterruptedException e) {
10401035
logger.trace("pingAndWait interrupted");
1041-
return response;
1036+
return new ZenPing.PingCollection();
1037+
} catch (ExecutionException e) {
1038+
logger.warn("Ping execution failed", e);
1039+
return new ZenPing.PingCollection();
10421040
}
10431041
}
10441042

core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,19 @@
3030

3131
import java.io.IOException;
3232
import java.util.ArrayList;
33-
import java.util.Collection;
3433
import java.util.HashMap;
3534
import java.util.List;
3635
import java.util.Map;
3736
import java.util.concurrent.atomic.AtomicLong;
37+
import java.util.function.Consumer;
3838

3939
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
4040

4141
public interface ZenPing extends Releasable {
4242

4343
void start(PingContextProvider contextProvider);
4444

45-
void ping(PingListener listener, TimeValue timeout);
46-
47-
interface PingListener {
48-
49-
/**
50-
* called when pinging is done.
51-
*
52-
* @param pings ping result *must
53-
*/
54-
void onPing(Collection<PingResponse> pings);
55-
}
45+
void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);
5646

5747
class PingResponse implements Streamable {
5848

@@ -191,13 +181,6 @@ public synchronized boolean addPing(PingResponse ping) {
191181
return false;
192182
}
193183

194-
/** adds multiple pings if newer than previous pings from the same node */
195-
public synchronized void addPings(Iterable<PingResponse> pings) {
196-
for (PingResponse ping : pings) {
197-
addPing(ping);
198-
}
199-
}
200-
201184
/** serialize current pings to a list. It is guaranteed that the list contains one ping response per node */
202185
public synchronized List<PingResponse> toList() {
203186
return new ArrayList<>(pings.values());

core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.common.inject.internal.Nullable;
2122
import org.elasticsearch.common.unit.TimeValue;
2223

2324
import java.util.ArrayList;
@@ -35,16 +36,25 @@
3536
public final class ConnectionProfile {
3637

3738
/**
38-
* A pre-built light connection profile that shares a single connection across all
39-
* types.
39+
* Builds a connection profile that is dedicated to a single channel type. Use this
40+
* when opening single use connections
4041
*/
41-
public static final ConnectionProfile LIGHT_PROFILE = new ConnectionProfile(
42-
Collections.singletonList(new ConnectionTypeHandle(0, 1, EnumSet.of(
43-
TransportRequestOptions.Type.BULK,
44-
TransportRequestOptions.Type.PING,
45-
TransportRequestOptions.Type.RECOVERY,
46-
TransportRequestOptions.Type.REG,
47-
TransportRequestOptions.Type.STATE))), 1, null, null);
42+
public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType,
43+
@Nullable TimeValue connectTimeout,
44+
@Nullable TimeValue handshakeTimeout) {
45+
Builder builder = new Builder();
46+
builder.addConnections(1, channelType);
47+
final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
48+
otherTypes.remove(channelType);
49+
builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new));
50+
if (connectTimeout != null) {
51+
builder.setConnectTimeout(connectTimeout);
52+
}
53+
if (handshakeTimeout != null) {
54+
builder.setHandshakeTimeout(handshakeTimeout);
55+
}
56+
return builder.build();
57+
}
4858

4959
private final List<ConnectionTypeHandle> handles;
5060
private final int numConnections;

core/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public interface Transport extends LifecycleComponent {
6363
boolean nodeConnected(DiscoveryNode node);
6464

6565
/**
66-
* Connects to a node with the given connection profile. Use {@link ConnectionProfile#LIGHT_PROFILE} when just connecting for ping
67-
* and then disconnecting. If the node is already connected this method has no effect
66+
* Connects to a node with the given connection profile. If the node is already connected this method has no effect
6867
*/
6968
void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile) throws ConnectTransportException;
7069

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.concurrent.CopyOnWriteArrayList;
6363
import java.util.concurrent.CountDownLatch;
6464
import java.util.concurrent.ScheduledFuture;
65-
import java.util.concurrent.atomic.AtomicLong;
6665
import java.util.function.Function;
6766
import java.util.function.Supplier;
6867

@@ -328,32 +327,6 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
328327
}
329328
}
330329

331-
/**
332-
* Lightly connect to the specified node, returning updated node
333-
* information. The handshake will fail if the cluster name on the
334-
* target node mismatches the local cluster name and
335-
* {@code checkClusterName} is {@code true}.
336-
*
337-
* @param node the node to connect to
338-
* @param handshakeTimeout handshake timeout
339-
* @return the connected node
340-
* @throws ConnectTransportException if the connection failed
341-
* @throws IllegalStateException if the handshake failed
342-
*/
343-
public DiscoveryNode connectToNodeAndHandshake(
344-
final DiscoveryNode node,
345-
final long handshakeTimeout) throws IOException {
346-
if (node.equals(localNode)) {
347-
return localNode;
348-
}
349-
DiscoveryNode handshakeNode;
350-
try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) {
351-
handshakeNode = handshake(connection, handshakeTimeout);
352-
}
353-
connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
354-
return handshakeNode;
355-
}
356-
357330
/**
358331
* Executes a high-level handshake using the given connection
359332
* and returns the discovery node of the node the connection

0 commit comments

Comments
 (0)