From e5dfef88f55e0fe4a1a39ca536c792a15c463bfb Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 15 Dec 2016 16:12:55 +0100 Subject: [PATCH 1/4] Never connect and publish connection during unicast zen ping UnicastZenPing today establishes real connections to nodes during it's ping phase that can be used by other parts of the system. Yet, this is potentially dangerous and undesirable unless the nodes have been fully verified and shoudl be connected to in the case of a cluster state update or if we join a newly elected master. For zen-pings this change adds dedicated connections that are only kept alive while pings are running. --- .../discovery/zen/UnicastZenPing.java | 81 ++++++++++++------- .../transport/TransportService.java | 47 ++++++----- .../discovery/zen/UnicastZenPingTests.java | 3 +- .../TransportServiceHandshakeTests.java | 31 ++++--- .../test/transport/MockTransportService.java | 44 +++++++++- .../AbstractSimpleTransportTestCase.java | 8 +- 6 files changed, 144 insertions(+), 70 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 4a6006c1a040c..9d794bf8473f3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -47,7 +48,9 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -58,6 +61,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -68,8 +72,8 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; -import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -80,6 +84,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -328,10 +333,6 @@ protected void doRun() throws Exception { sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); - for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { - logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); - transportService.disconnectFromNode(node); - } } @Override @@ -359,7 +360,7 @@ public void onFailure(Exception e) { class SendPingsHandler implements Releasable { private final int id; - private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); + private final List temporaryConnections = new CopyOnWriteArrayList<>(); private final PingCollection pingCollection; private AtomicBoolean closed = new AtomicBoolean(false); @@ -385,8 +386,18 @@ public PingCollection pingCollection() { public void close() { if (closed.compareAndSet(false, true)) { receivedResponses.remove(id); + try { + IOUtils.close(temporaryConnections); + temporaryConnections.clear(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } + + public void addTemporaryConnection(Transport.Connection connection) { + temporaryConnections.add(connection); + } } @@ -455,7 +466,6 @@ void sendPings( logger.trace("replacing {} with temp node {}", nodeToSend, tempNode); nodeToSend = tempNode; } - sendPingsHandler.nodeToDisconnect.add(nodeToSend); } // fork the connection to another thread final DiscoveryNode finalNodeToSend = nodeToSend; @@ -467,18 +477,20 @@ public void run() { } boolean success = false; try { - // connect to the node, see if we manage to do it, if not, bail - if (!nodeFoundByAddress) { - logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis()); + Transport.Connection connection; + if (nodeFoundByAddress && transportService.nodeConnected(finalNodeToSend)) { + logger.trace("[{}] reusing existing ping connection to {}", sendPingsHandler.id(), finalNodeToSend); + connection = transportService.getConnection(finalNodeToSend); } else { - logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend); - transportService.connectToNode(finalNodeToSend); + // connect to the node, see if we manage to do it, if not, bail + logger.trace("[{}] open ping connection to {}", sendPingsHandler.id(), finalNodeToSend); + connection = transportService.openConnection(finalNodeToSend, ConnectionProfile.LIGHT_PROFILE); + sendPingsHandler.addTemporaryConnection(connection); + transportService.handshake(connection, timeout.millis()); } - logger.trace("[{}] connected to {}", sendPingsHandler.id(), node); if (receivedResponses.containsKey(sendPingsHandler.id())) { // we are connected and still in progress, send the ping request - sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); + sendPingRequestToNode(() -> connection, sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); } else { // connect took too long, just log it and bail latch.countDown(); @@ -508,7 +520,9 @@ public void run() { } }); } else { - sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); + final DiscoveryNode finalNodeToSend = nodeToSend; + sendPingRequestToNode(() -> transportService.getConnection(finalNodeToSend), + sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); } } if (waitTime != null) { @@ -520,11 +534,22 @@ public void run() { } } - private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, - final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { + private void sendPingRequestToNode(final Supplier connection, final int id, final TimeValue timeout, + final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, + final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); - transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder() - .withTimeout((long) (timeout.millis() * 1.25)).build(), new TransportResponseHandler() { + TransportRequestOptions options = TransportRequestOptions.builder() + .withTimeout((long) (timeout.millis() * 1.25)).build(); + Consumer handleException = (exp) -> { + latch.countDown(); + if (exp instanceof ConnectTransportException) { + // ok, not connected... + logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp); + } else { + logger.warn((Supplier) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp); + } + }; + TransportResponseHandler handler = new TransportResponseHandler() { @Override public UnicastPingResponse newInstance() { @@ -563,15 +588,15 @@ public void handleResponse(UnicastPingResponse response) { @Override public void handleException(TransportException exp) { - latch.countDown(); - if (exp instanceof ConnectTransportException) { - // ok, not connected... - logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp); - } else { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp); - } + handleException.accept(exp); } - }); + }; + try { + transportService.sendRequest(connection.get(), ACTION_NAME, pingRequest, options, handler); + } catch (Exception e) { + // connection.get() might barf - we have to handle this + handleException.accept(e); + } } private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a02b763f2d981..384057bbcc3b0 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -290,6 +290,9 @@ public List getLocalAddresses() { return transport.getLocalAddresses(); } + /** + * Returns true iff the given node is already connected. + */ public boolean nodeConnected(DiscoveryNode node) { return node.equals(localNode) || transport.nodeConnected(node); } @@ -312,32 +315,32 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection } /** - * Lightly connect to the specified node, returning updated node - * information. The handshake will fail if the cluster name on the - * target node mismatches the local cluster name and - * {@code checkClusterName} is {@code true}. + * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers + * responsibility to close the connection once it goes out of scope. + * @param node the node to connect to + * @param profile the connection profile to use + */ + public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException { + if (node.equals(localNode)) { + return localNodeConnection; + } else { + return transport.openConnection(node, profile); + } + } + + /** + * Executes a high-level handshake using the given connection + * and returns the discovery node of the node the connection + * was established with. The handshake will fail if the cluster + * name on the target node mismatches the local cluster name. * - * @param node the node to connect to + * @param connection the connection to a specific node * @param handshakeTimeout handshake timeout * @return the connected node * @throws ConnectTransportException if the connection failed * @throws IllegalStateException if the handshake failed */ - public DiscoveryNode connectToNodeAndHandshake( - final DiscoveryNode node, - final long handshakeTimeout) throws IOException { - if (node.equals(localNode)) { - return localNode; - } - DiscoveryNode handshakeNode; - try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) { - handshakeNode = handshake(connection, handshakeTimeout); - } - connectToNode(node, ConnectionProfile.LIGHT_PROFILE); - return handshakeNode; - } - - private DiscoveryNode handshake( + public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { final HandshakeResponse response; @@ -465,7 +468,7 @@ public final void sendRequest(final DiscoveryNode } } - final void sendRequest(final Transport.Connection connection, final String action, + public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { @@ -477,7 +480,7 @@ final void sendRequest(final Transport.Connection * Returns either a real transport connection or a local node connection if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected */ - private Transport.Connection getConnection(DiscoveryNode node) { + public Transport.Connection getConnection(DiscoveryNode node) { if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) { return localNodeConnection; } else { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 9886abb900ab9..de8d1a562e803 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; @@ -571,7 +572,7 @@ private NetworkHandle startServices( final BiFunction supplier) { final Transport transport = supplier.apply(settings, version); final TransportService transportService = - new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 16735f34efeec..ec03ffd8ebeaa 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -113,14 +113,15 @@ public void testConnectToNodeLight() throws IOException { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - DiscoveryNode connectedNode = - handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); - assertNotNull(connectedNode); - - // the name and version should be updated - assertEquals(connectedNode.getName(), "TS_B"); - assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); - assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){ + DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout); + assertNotNull(connectedNode); + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + } public void testMismatchedClusterName() { @@ -133,8 +134,11 @@ public void testMismatchedClusterName() { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } @@ -150,8 +154,11 @@ public void testIncompatibleVersions() { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, incompatible version")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dd05457cec11a..a35a1919cb711 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -57,11 +57,13 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @@ -80,6 +82,7 @@ */ public final class MockTransportService extends TransportService { + private final Map> openConnections = new HashMap<>(); public static class TestPlugin extends Plugin { @Override @@ -553,9 +556,7 @@ public void stop() { } @Override - public void close() { - transport.close(); - } + public void close() { transport.close(); } @Override public Map profileBoundAddresses() { @@ -701,4 +702,41 @@ public Transport getOriginalTransport() { } return transport; } + + @Override + public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) { + final AtomicBoolean closed = new AtomicBoolean(false); + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (closed.compareAndSet(false, true)) { + synchronized (openConnections) { + List connections = openConnections.get(node); + boolean remove = connections.remove(this); + assert remove; + if (connections.isEmpty()) { + openConnections.remove(node); + } + } + } + } + + } + }; + synchronized (openConnections) { + List connections = openConnections.computeIfAbsent(node, + (n) -> new CopyOnWriteArrayList<>()); + connections.add(filteredConnection); + } + return filteredConnection; + } + + @Override + protected void doClose() { + super.doClose(); + assert openConnections.size() == 0 : "still open connections: " + openConnections; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 2beff7890b3ce..0f3f7b501953f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1346,8 +1346,8 @@ public void handleException(TransportException exp) { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well @@ -1404,8 +1404,8 @@ public void handleException(TransportException exp) { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well From 2147f851a58d1080b39522ae3a6b96ca34328fa3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Dec 2016 14:28:15 +0100 Subject: [PATCH 2/4] fix line len --- .../java/org/elasticsearch/discovery/zen/UnicastZenPing.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 9d794bf8473f3..8be40106f819c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -490,7 +490,8 @@ public void run() { } if (receivedResponses.containsKey(sendPingsHandler.id())) { // we are connected and still in progress, send the ping request - sendPingRequestToNode(() -> connection, sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); + sendPingRequestToNode(() -> connection, sendPingsHandler.id(), timeout, pingRequest, latch, node, + finalNodeToSend); } else { // connect took too long, just log it and bail latch.countDown(); From a6cda7e72ed060ff307f93176ec414abb06b540d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Dec 2016 15:25:33 +0100 Subject: [PATCH 3/4] more line len fixes --- .../transport/TransportServiceHandshakeTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index ec03ffd8ebeaa..50be8c70f2ea1 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -135,7 +135,8 @@ public void testMismatchedClusterName() { emptySet(), Version.CURRENT.minimumCompatibilityVersion()); IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { - try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { handleA.transportService.handshake(connection, timeout); } }); @@ -155,7 +156,8 @@ public void testIncompatibleVersions() { emptySet(), Version.CURRENT.minimumCompatibilityVersion()); IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { - try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { handleA.transportService.handshake(connection, timeout); } }); From bd9eee53b76f18a74776fd71318b1246a7dd571d Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 16 Dec 2016 21:42:35 +0100 Subject: [PATCH 4/4] Rollback changes to UnicastZenPing.java --- .../discovery/zen/UnicastZenPing.java | 82 +++++++------------ .../transport/TransportService.java | 26 ++++++ .../TransportServiceHandshakeTests.java | 9 ++ 3 files changed, 63 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 8be40106f819c..4a6006c1a040c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -48,9 +47,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -61,7 +58,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -72,8 +68,8 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; +import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -84,7 +80,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -333,6 +328,10 @@ protected void doRun() throws Exception { sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); + for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { + logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node); + transportService.disconnectFromNode(node); + } } @Override @@ -360,7 +359,7 @@ public void onFailure(Exception e) { class SendPingsHandler implements Releasable { private final int id; - private final List temporaryConnections = new CopyOnWriteArrayList<>(); + private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); private final PingCollection pingCollection; private AtomicBoolean closed = new AtomicBoolean(false); @@ -386,18 +385,8 @@ public PingCollection pingCollection() { public void close() { if (closed.compareAndSet(false, true)) { receivedResponses.remove(id); - try { - IOUtils.close(temporaryConnections); - temporaryConnections.clear(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } } } - - public void addTemporaryConnection(Transport.Connection connection) { - temporaryConnections.add(connection); - } } @@ -466,6 +455,7 @@ void sendPings( logger.trace("replacing {} with temp node {}", nodeToSend, tempNode); nodeToSend = tempNode; } + sendPingsHandler.nodeToDisconnect.add(nodeToSend); } // fork the connection to another thread final DiscoveryNode finalNodeToSend = nodeToSend; @@ -477,21 +467,18 @@ public void run() { } boolean success = false; try { - Transport.Connection connection; - if (nodeFoundByAddress && transportService.nodeConnected(finalNodeToSend)) { - logger.trace("[{}] reusing existing ping connection to {}", sendPingsHandler.id(), finalNodeToSend); - connection = transportService.getConnection(finalNodeToSend); + // connect to the node, see if we manage to do it, if not, bail + if (!nodeFoundByAddress) { + logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend); + transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis()); } else { - // connect to the node, see if we manage to do it, if not, bail - logger.trace("[{}] open ping connection to {}", sendPingsHandler.id(), finalNodeToSend); - connection = transportService.openConnection(finalNodeToSend, ConnectionProfile.LIGHT_PROFILE); - sendPingsHandler.addTemporaryConnection(connection); - transportService.handshake(connection, timeout.millis()); + logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend); + transportService.connectToNode(finalNodeToSend); } + logger.trace("[{}] connected to {}", sendPingsHandler.id(), node); if (receivedResponses.containsKey(sendPingsHandler.id())) { // we are connected and still in progress, send the ping request - sendPingRequestToNode(() -> connection, sendPingsHandler.id(), timeout, pingRequest, latch, node, - finalNodeToSend); + sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); } else { // connect took too long, just log it and bail latch.countDown(); @@ -521,9 +508,7 @@ public void run() { } }); } else { - final DiscoveryNode finalNodeToSend = nodeToSend; - sendPingRequestToNode(() -> transportService.getConnection(finalNodeToSend), - sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend); + sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); } } if (waitTime != null) { @@ -535,22 +520,11 @@ public void run() { } } - private void sendPingRequestToNode(final Supplier connection, final int id, final TimeValue timeout, - final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, - final DiscoveryNode nodeToSend) { + private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, + final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); - TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout((long) (timeout.millis() * 1.25)).build(); - Consumer handleException = (exp) -> { - latch.countDown(); - if (exp instanceof ConnectTransportException) { - // ok, not connected... - logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp); - } else { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp); - } - }; - TransportResponseHandler handler = new TransportResponseHandler() { + transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder() + .withTimeout((long) (timeout.millis() * 1.25)).build(), new TransportResponseHandler() { @Override public UnicastPingResponse newInstance() { @@ -589,15 +563,15 @@ public void handleResponse(UnicastPingResponse response) { @Override public void handleException(TransportException exp) { - handleException.accept(exp); + latch.countDown(); + if (exp instanceof ConnectTransportException) { + // ok, not connected... + logger.trace((Supplier) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp); + } else { + logger.warn((Supplier) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp); + } } - }; - try { - transportService.sendRequest(connection.get(), ACTION_NAME, pingRequest, options, handler); - } catch (Exception e) { - // connection.get() might barf - we have to handle this - handleException.accept(e); - } + }); } private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 384057bbcc3b0..8884177ba63ef 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -328,6 +328,32 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP } } + /** + * Lightly connect to the specified node, returning updated node + * information. The handshake will fail if the cluster name on the + * target node mismatches the local cluster name and + * {@code checkClusterName} is {@code true}. + * + * @param node the node to connect to + * @param handshakeTimeout handshake timeout + * @return the connected node + * @throws ConnectTransportException if the connection failed + * @throws IllegalStateException if the handshake failed + */ + public DiscoveryNode connectToNodeAndHandshake( + final DiscoveryNode node, + final long handshakeTimeout) throws IOException { + if (node.equals(localNode)) { + return localNode; + } + DiscoveryNode handshakeNode; + try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) { + handshakeNode = handshake(connection, handshakeTimeout); + } + connectToNode(node, ConnectionProfile.LIGHT_PROFILE); + return handshakeNode; + } + /** * Executes a high-level handshake using the given connection * and returns the discovery node of the node the connection diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 50be8c70f2ea1..fd756f6790e29 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -122,6 +122,15 @@ public void testConnectToNodeLight() throws IOException { assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } + DiscoveryNode connectedNode = + handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); + assertNotNull(connectedNode); + + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + } public void testMismatchedClusterName() {