Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
private final Object mutex = new Object();

private volatile List<DiscoveryNode> nodes = Collections.emptyList();
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
Expand Down Expand Up @@ -268,7 +270,7 @@ public static class RetryListener<Response> implements ActionListener<Response>
private volatile int i;

RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
Expand Down Expand Up @@ -361,16 +363,16 @@ public void sample() {
protected abstract void doSample();

/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
* node returned in the handshake response is different than the discovery node.
*/
protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes, boolean validateInHandshake) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateInHandshake not used anymore?

for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
try {
logger.trace("connecting to node [{}]", node);
transportService.connectToNode(node);
transportService.connectToNode(node, null, validateInHandshake);
} catch (Exception e) {
it.remove();
logger.debug(() -> new ParameterizedMessage("failed to connect to discovered node [{}]", node), e);
Expand All @@ -380,7 +382,6 @@ protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {

return Collections.unmodifiableList(new ArrayList<>(nodes));
}

}

class ScheduledNodeSampler implements Runnable {
Expand All @@ -402,14 +403,16 @@ class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
public LivenessResponse read(StreamInput in) throws IOException {
LivenessResponse response = new LivenessResponse();
response.readFrom(in);
return response;
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
Expand All @@ -435,8 +438,8 @@ public LivenessResponse newInstance() {
}
}

nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
nodes = establishNodeConnections(newNodes, false);
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
}
}

Expand Down Expand Up @@ -557,7 +560,7 @@ public void handleException(TransportException e) {
}
}

nodes = validateNewNodes(newNodes);
nodes = establishNodeConnections(newNodes, true);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public boolean nodeConnected(DiscoveryNode node) {
return isLocalNode(node) || transport.nodeConnected(node);
}

/**
* Connect to the specified node with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, null);
}
Expand All @@ -325,13 +330,25 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
* @param connectionProfile the connection profile to use when connecting to this node
*/
public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
connectToNode(node, connectionProfile, true);
}

/**
* Connect to the specified node with the given connection profile. This method allows the caller to
* specify whether the node connection should be validated against the specified node.
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param validateNodeConnection boolean indicating if the node connection should be validated
*/
public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, boolean validateNodeConnection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please not add another API here but rather try to detect if we are a client or a node? I mean down the road we don't want to support this anymore so lets make sure we don't have special public APIs. I can imagine a boolean on the ctor or even check if ClusterService is != null (not that intuitive). You can just add an overloaded ctor instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also just look at the settings like this TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));

if (isLocalNode(node)) {
return;
}
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
if (node.equals(remote) == false) {
if (validateNodeConnection && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ public void testNodeConnectWithDifferentNodeId() {
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

public void testNodeConnectWithDifferentNodeIdSucceedsIfNoValidation() {
Settings settings = Settings.builder().put("cluster.name", "test").build();
NetworkHandle handleA = startServices("TS_A", settings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings, Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());

handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE, false);
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}


private static class NetworkHandle {
private TransportService transportService;
Expand Down