Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
Expand Down
190 changes: 159 additions & 31 deletions core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -57,23 +58,30 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand All @@ -89,6 +97,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
Property.NodeScope);
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope);
public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(30), Property.NodeScope);

// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
Expand All @@ -100,7 +110,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {

private final int concurrentConnects;

private final DiscoveryNode[] configuredTargetNodes;
private final List<String> configuredHosts;

private final int limitPortCounts;

private volatile PingContextProvider contextProvider;

Expand All @@ -114,13 +126,15 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {

private final Map<Integer, SendPingsHandler> receivedResponses = newConcurrentMap();

// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
// a list of temporal responses a node will return for a request (holds requests from other configuredHosts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's just other nodes - we are pinged by any node in the previous cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 4868321.

private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();

private final UnicastHostsProvider hostsProvider;

private final ExecutorService unicastConnectExecutor;

private final TimeValue resolveTimeout;

private volatile boolean closed = false;

public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
Expand All @@ -132,59 +146,158 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
this.hostsProvider = unicastHostsProvider;

this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
final int limitPortCounts;
final List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
if (hosts.isEmpty()) {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
hosts.addAll(transportService.getLocalAddresses());
} else {
configuredHosts = hosts;
// we only limit to 1 addresses, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
}

logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> configuredTargetNodes = new ArrayList<>();
for (final String host : hosts) {
configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"));
}
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
logger.debug(
"using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]",
configuredHosts,
concurrentConnects,
resolveTimeout);

transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
new UnicastPingRequestHandler());

ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS,
threadFactory, threadPool.getThreadContext());

}

private static class ResolvedHostname {

private final TransportAddress[] addresses;
private final UnknownHostException failure;

public static ResolvedHostname success(final TransportAddress[] addresses) {
return new ResolvedHostname(addresses, null);
}

public static ResolvedHostname failure(final UnknownHostException failure) {
return new ResolvedHostname(null, failure);
}

private ResolvedHostname(final TransportAddress[] addresses, UnknownHostException failure) {
assert addresses != null && failure == null || addresses == null && failure != null;
this.addresses = addresses;
this.failure = failure;
}

public boolean isSuccess() {
return addresses != null;
}

public TransportAddress[] addresses() {
return addresses;
}

public UnknownHostException failure() {
assert !isSuccess();
return failure;
}

}

/**
* Resolves a host to a list of discovery nodes. The host is resolved into a transport
* address (or a collection of addresses if the number of ports is greater than one) and
* the transport addresses are used to created discovery nodes.
* Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
* if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
* in parallel using the generic thread pool from the specified thread pool up to the specified resolve timeout.
*
* @param host the host to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param threadPool the thread pool used to parallelize hostname lookups
* @param logger logger used for logging messages regarding hostname lookups
* @param hosts the hosts to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param transportService the transport service
* @param idGenerator the generator to supply unique ids for each discovery node
* @param idGenerator the generator to supply unique ids for each discovery node
* @param resolveTimeout the timeout before returning from hostname lookups
* @return a list of discovery nodes with resolved transport addresses
*/
public static List<DiscoveryNode> resolveDiscoveryNodes(final String host, final int limitPortCounts,
final TransportService transportService, final Supplier<String> idGenerator) {
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
try {
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
public static List<DiscoveryNode> resolveDiscoveryNodes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of the ResolvedHostname abstraction - what am I missing?

diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
index 61bf1cc..3d3495e 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java
@@ -242,40 +242,36 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
             throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
         }
         // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
-        final List<Callable<ResolvedHostname>> callables =
+        final List<Callable<TransportAddress[]>> callables =
             hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList());
-        final List<Future<ResolvedHostname>> futures =
+        final List<Future<TransportAddress[]>> futures =
             executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
         final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
         // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
         // hostname with the corresponding task by iterating together
         final Iterator<String> it = hosts.iterator();
-        for (final Future<ResolvedHostname> future : futures) {
+        for (final Future<TransportAddress[]> future : futures) {
             final String hostname = it.next();
-            if (!future.isCancelled()) {
+            if (future.isCancelled()) {
+                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
+            } else {
+                assert future.isDone(); // guaranteed by the invokeAll
                 try {
-                    final ResolvedHostname resolvedHostname = future.get();
-                    if (resolvedHostname.isSuccess()) {
-                        logger.trace("resolved host [{}] to {}", hostname, resolvedHostname.addresses());
-                        for (final TransportAddress address : resolvedHostname.addresses()) {
-                            discoveryNodes.add(
-                                new DiscoveryNode(
-                                    idGenerator.get(),
-                                    address,
-                                    emptyMap(),
-                                    emptySet(),
-                                    Version.CURRENT.minimumCompatibilityVersion()));
-                        }
-                    } else {
-                        final String message = "failed to resolve host [" + hostname + "]";
-                        logger.warn(message, resolvedHostname.failure());
+                    final TransportAddress[] addresses = future.get();
+                    logger.trace("resolved host [{}] to {}", hostname, addresses);
+                    for (final TransportAddress address : addresses) {
+                        discoveryNodes.add(
+                            new DiscoveryNode(
+                                idGenerator.get(),
+                                address,
+                                emptyMap(),
+                                emptySet(),
+                                Version.CURRENT.minimumCompatibilityVersion()));
                     }
                 } catch (final ExecutionException e) {
                     final String message = "failed to resolve host [" + hostname + "]";
                     logger.warn(message, e);
                 }
-            } else {
-                logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
             }
         }
         return discoveryNodes;
@@ -289,17 +285,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
      * @param limitPortCounts  the port count limit
      * @return a callable that can be used to submit to an executor service
      */
-    private static Callable<ResolvedHostname> lookup(
+    private static Callable<TransportAddress[]> lookup(
         final String host,
         final TransportService transportService,
         final int limitPortCounts) {
-        return () -> {
-            try {
-                return ResolvedHostname.success(transportService.addressesFromString(host, limitPortCounts));
-            } catch (final UnknownHostException e) {
-                return ResolvedHostname.failure(e);
-            }
-        };
+        return () -> transportService.addressesFromString(host, limitPortCounts);
     }
 
     @Override

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I pushed e2fc5a2.

final ThreadPool threadPool,
final Logger logger,
final List<String> hosts,
final int limitPortCounts,
final TransportService transportService,
final Supplier<String> idGenerator,
final TimeValue resolveTimeout) throws InterruptedException {
Objects.requireNonNull(threadPool);
Objects.requireNonNull(logger);
Objects.requireNonNull(hosts);
Objects.requireNonNull(transportService);
Objects.requireNonNull(idGenerator);
Objects.requireNonNull(resolveTimeout);
if (resolveTimeout.nanos() < 0) {
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
}
// create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
final List<Callable<ResolvedHostname>> callables =
hosts.stream().map(hn -> lookup(hn, transportService, limitPortCounts)).collect(Collectors.toList());
final List<Future<ResolvedHostname>> futures =
threadPool.generic().invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might decide this is okay, but one question is whether these lookups should be throttled at all as right now it is just spamming the generic thread pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we should use unicastConnectExecutor for this and keep it contained (and throttled).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I went with this approach is since this method is also used by the file-based unicast hosts provider so it would need its own executor and we do not have a close method for plugins to allow this executor be shutdown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually plugins can implement Closeable and they will be closed when the node shuts down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was looking for it on the Plugins base class, but I see we do close a plugin if it implements java.io.Closeable. I think this should be made more clear and opened #21669.

final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
for (final Future<ResolvedHostname> future : futures) {
final String hostname = it.next();
if (!future.isCancelled()) {
try {
final ResolvedHostname resolvedHostname = future.get();
if (resolvedHostname.isSuccess()) {
logger.trace("resolved host [{}] to {}", hostname, resolvedHostname.addresses());
for (final TransportAddress address : resolvedHostname.addresses()) {
discoveryNodes.add(
new DiscoveryNode(
idGenerator.get(),
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
} else {
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, resolvedHostname.failure());
}
} catch (final ExecutionException e) {
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e);
}
} else {
logger.warn("timed out resolving host [{}]", hostname);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add the timeout value here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed f6203d7.

}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
return discoveryNodes;
}

/**
* Creates a callable for looking up the specified host.
*
* @param host the host to lookup
* @param transportService the transport service to use for lookups
* @param limitPortCounts the port count limit
* @return a callable that can be used to submit to an executor service
*/
private static Callable<ResolvedHostname> lookup(
final String host,
final TransportService transportService,
final int limitPortCounts) {
return () -> {
try {
return ResolvedHostname.success(transportService.addressesFromString(host, limitPortCounts));
} catch (final UnknownHostException e) {
return ResolvedHostname.failure(e);
}
};
}

@Override
public void close() {
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
Expand Down Expand Up @@ -330,8 +443,23 @@ void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final Send
// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);

// new add the unicast targets first
List<DiscoveryNode> nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes);
// add the configured hosts first
final List<DiscoveryNode> nodesToPing = new ArrayList<>();
final List<DiscoveryNode> resolvedDiscoveryNodes;
try {
resolvedDiscoveryNodes = resolveDiscoveryNodes(
threadPool,
logger,
configuredHosts,
limitPortCounts,
transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
resolveTimeout);
nodesToPing.addAll(resolvedDiscoveryNodes);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}

nodesToPing.addAll(sortedNodesToPing);

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
Expand Down Expand Up @@ -715,7 +714,7 @@ public static int resolvePublishPort(String profileName, Settings settings, Sett
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.transport.TransportAddress;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;

Expand All @@ -53,7 +54,7 @@ public interface Transport extends LifecycleComponent {
/**
* Returns an address from its string representation.
*/
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception;
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;

/**
* Is the address type supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -617,7 +618,7 @@ private long newRequestId() {
return requestIds.getAndIncrement();
}

public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.transport.TransportServiceAdapter;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -133,7 +134,7 @@ public BoundTransportAddress boundAddress() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.Before;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -188,7 +189,7 @@ public Map<String, BoundTransportAddress> profileBoundAddresses() {
}

@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return new TransportAddress[0];
}

Expand Down
Loading