Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}

/**
* Returns <code>true</code> iff the given node is already connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(localNode) || transport.nodeConnected(node);
}
Expand All @@ -311,6 +314,20 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
transport.connectToNode(node, connectionProfile);
}

/**
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
* responsibility to close the connection once it goes out of scope.
* @param node the node to connect to
* @param profile the connection profile to use
*/
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
if (node.equals(localNode)) {
return localNodeConnection;
} else {
return transport.openConnection(node, profile);
}
}

/**
* Lightly connect to the specified node, returning updated node
* information. The handshake will fail if the cluster name on the
Expand All @@ -337,7 +354,19 @@ public DiscoveryNode connectToNodeAndHandshake(
return handshakeNode;
}

private DiscoveryNode handshake(
/**
* Executes a high-level handshake using the given connection
* and returns the discovery node of the node the connection
* was established with. The handshake will fail if the cluster
* name on the target node mismatches the local cluster name.
*
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @return the connected node
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
final HandshakeResponse response;
Expand Down Expand Up @@ -465,7 +494,7 @@ public final <T extends TransportResponse> void sendRequest(final DiscoveryNode
}
}

final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
Expand All @@ -477,7 +506,7 @@ final <T extends TransportResponse> void sendRequest(final Transport.Connection
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
private Transport.Connection getConnection(DiscoveryNode node) {
public Transport.Connection getConnection(DiscoveryNode node) {
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
return localNodeConnection;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
Expand Down Expand Up @@ -571,7 +572,7 @@ private NetworkHandle startServices(
final BiFunction<Settings, Version, Transport> supplier) {
final Transport transport = supplier.apply(settings, version);
final TransportService transportService =
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,24 @@ public void testConnectToNodeLight() throws IOException {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
assertNotNull(connectedNode);
// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

DiscoveryNode connectedNode =
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
assertNotNull(connectedNode);

// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertTrue(handleA.transportService.nodeConnected(discoveryNode));

}

public void testMismatchedClusterName() {
Expand All @@ -133,8 +143,12 @@ public void testMismatchedClusterName() {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
Expand All @@ -150,8 +164,12 @@ public void testIncompatibleVersions() {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -80,6 +82,7 @@
*/
public final class MockTransportService extends TransportService {

private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();

public static class TestPlugin extends Plugin {
@Override
Expand Down Expand Up @@ -553,9 +556,7 @@ public void stop() {
}

@Override
public void close() {
transport.close();
}
public void close() { transport.close(); }

@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
Expand Down Expand Up @@ -701,4 +702,41 @@ public Transport getOriginalTransport() {
}
return transport;
}

@Override
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (closed.compareAndSet(false, true)) {
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.get(node);
boolean remove = connections.remove(this);
assert remove;
if (connections.isEmpty()) {
openConnections.remove(node);
}
}
}
}

}
};
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
(n) -> new CopyOnWriteArrayList<>());
connections.add(filteredConnection);
}
return filteredConnection;
}

@Override
protected void doClose() {
super.doClose();
assert openConnections.size() == 0 : "still open connections: " + openConnections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1346,8 +1346,8 @@ public void handleException(TransportException exp) {
// all is well
}

try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well
Expand Down Expand Up @@ -1404,8 +1404,8 @@ public void handleException(TransportException exp) {
// all is well
}

try {
serviceB.connectToNodeAndHandshake(nodeA, 100);
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
serviceB.handshake(connection, 100);
fail("exception should be thrown");
} catch (IllegalStateException e) {
// all is well
Expand Down