diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index a02b763f2d981..8884177ba63ef 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -290,6 +290,9 @@ public List getLocalAddresses() { return transport.getLocalAddresses(); } + /** + * Returns true iff the given node is already connected. + */ public boolean nodeConnected(DiscoveryNode node) { return node.equals(localNode) || transport.nodeConnected(node); } @@ -311,6 +314,20 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection transport.connectToNode(node, connectionProfile); } + /** + * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers + * responsibility to close the connection once it goes out of scope. + * @param node the node to connect to + * @param profile the connection profile to use + */ + public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException { + if (node.equals(localNode)) { + return localNodeConnection; + } else { + return transport.openConnection(node, profile); + } + } + /** * Lightly connect to the specified node, returning updated node * information. The handshake will fail if the cluster name on the @@ -337,7 +354,19 @@ public DiscoveryNode connectToNodeAndHandshake( return handshakeNode; } - private DiscoveryNode handshake( + /** + * Executes a high-level handshake using the given connection + * and returns the discovery node of the node the connection + * was established with. The handshake will fail if the cluster + * name on the target node mismatches the local cluster name. + * + * @param connection the connection to a specific node + * @param handshakeTimeout handshake timeout + * @return the connected node + * @throws ConnectTransportException if the connection failed + * @throws IllegalStateException if the handshake failed + */ + public DiscoveryNode handshake( final Transport.Connection connection, final long handshakeTimeout) throws ConnectTransportException { final HandshakeResponse response; @@ -465,7 +494,7 @@ public final void sendRequest(final DiscoveryNode } } - final void sendRequest(final Transport.Connection connection, final String action, + public final void sendRequest(final Transport.Connection connection, final String action, final TransportRequest request, final TransportRequestOptions options, TransportResponseHandler handler) { @@ -477,7 +506,7 @@ final void sendRequest(final Transport.Connection * Returns either a real transport connection or a local node connection if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected */ - private Transport.Connection getConnection(DiscoveryNode node) { + public Transport.Connection getConnection(DiscoveryNode node) { if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) { return localNodeConnection; } else { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 9886abb900ab9..de8d1a562e803 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; @@ -571,7 +572,7 @@ private NetworkHandle startServices( final BiFunction supplier) { final Transport transport = supplier.apply(settings, version); final TransportService transportService = - new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java index 16735f34efeec..fd756f6790e29 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java @@ -113,14 +113,24 @@ public void testConnectToNodeLight() throws IOException { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){ + DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout); + assertNotNull(connectedNode); + // the name and version should be updated + assertEquals(connectedNode.getName(), "TS_B"); + assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); + assertFalse(handleA.transportService.nodeConnected(discoveryNode)); + } + DiscoveryNode connectedNode = - handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); + handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout); assertNotNull(connectedNode); // the name and version should be updated assertEquals(connectedNode.getName(), "TS_B"); assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion()); assertTrue(handleA.transportService.nodeConnected(discoveryNode)); + } public void testMismatchedClusterName() { @@ -133,8 +143,12 @@ public void testMismatchedClusterName() { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } @@ -150,8 +164,12 @@ public void testIncompatibleVersions() { emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake( - discoveryNode, timeout)); + IllegalStateException ex = expectThrows(IllegalStateException.class, () -> { + try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, + ConnectionProfile.LIGHT_PROFILE)) { + handleA.transportService.handshake(connection, timeout); + } + }); assertThat(ex.getMessage(), containsString("handshake failed, incompatible version")); assertFalse(handleA.transportService.nodeConnected(discoveryNode)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dd05457cec11a..a35a1919cb711 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -57,11 +57,13 @@ import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; @@ -80,6 +82,7 @@ */ public final class MockTransportService extends TransportService { + private final Map> openConnections = new HashMap<>(); public static class TestPlugin extends Plugin { @Override @@ -553,9 +556,7 @@ public void stop() { } @Override - public void close() { - transport.close(); - } + public void close() { transport.close(); } @Override public Map profileBoundAddresses() { @@ -701,4 +702,41 @@ public Transport getOriginalTransport() { } return transport; } + + @Override + public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException { + FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) { + final AtomicBoolean closed = new AtomicBoolean(false); + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + if (closed.compareAndSet(false, true)) { + synchronized (openConnections) { + List connections = openConnections.get(node); + boolean remove = connections.remove(this); + assert remove; + if (connections.isEmpty()) { + openConnections.remove(node); + } + } + } + } + + } + }; + synchronized (openConnections) { + List connections = openConnections.computeIfAbsent(node, + (n) -> new CopyOnWriteArrayList<>()); + connections.add(filteredConnection); + } + return filteredConnection; + } + + @Override + protected void doClose() { + super.doClose(); + assert openConnections.size() == 0 : "still open connections: " + openConnections; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 2beff7890b3ce..0f3f7b501953f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1346,8 +1346,8 @@ public void handleException(TransportException exp) { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well @@ -1404,8 +1404,8 @@ public void handleException(TransportException exp) { // all is well } - try { - serviceB.connectToNodeAndHandshake(nodeA, 100); + try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){ + serviceB.handshake(connection, 100); fail("exception should be thrown"); } catch (IllegalStateException e) { // all is well