Skip to content

Commit c2efb4b

Browse files
committed
Transport client: Don't validate node in handshake (elastic#30737)
This is related to elastic#30141. Right now in the transport client we open a temporary node connection and take the node information. This node information is used to open a permanent connection that is used for the client. However, we continue to use the configured transport address. If the configured transport address is a load balancer, you might connect to a different node for the permanent connection. This causes the handshake validation to fail. This commit removes the handshake validation for the transport client when it simple node sample mode.
1 parent b81366f commit c2efb4b

File tree

3 files changed

+66
-13
lines changed

3 files changed

+66
-13
lines changed

server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24-
import org.apache.logging.log4j.util.Supplier;
24+
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.core.internal.io.IOUtils;
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.Version;
@@ -56,6 +56,7 @@
5656
import org.elasticsearch.transport.TransportService;
5757

5858
import java.io.Closeable;
59+
import java.io.IOException;
5960
import java.util.ArrayList;
6061
import java.util.Collections;
6162
import java.util.HashSet;
@@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
8990
private final Object mutex = new Object();
9091

9192
private volatile List<DiscoveryNode> nodes = Collections.emptyList();
93+
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
9294
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();
9395

9496
private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
@@ -268,7 +270,7 @@ public static class RetryListener<Response> implements ActionListener<Response>
268270
private volatile int i;
269271

270272
RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
271-
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
273+
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
272274
this.callback = callback;
273275
this.listener = listener;
274276
this.nodes = nodes;
@@ -361,10 +363,10 @@ public void sample() {
361363
protected abstract void doSample();
362364

363365
/**
364-
* validates a set of potentially newly discovered nodes and returns an immutable
365-
* list of the nodes that has passed.
366+
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
367+
* node returned in the handshake response is different than the discovery node.
366368
*/
367-
protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
369+
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
368370
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
369371
DiscoveryNode node = it.next();
370372
if (!transportService.nodeConnected(node)) {
@@ -380,7 +382,6 @@ protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
380382

381383
return Collections.unmodifiableList(new ArrayList<>(nodes));
382384
}
383-
384385
}
385386

386387
class ScheduledNodeSampler implements Runnable {
@@ -402,14 +403,16 @@ class SimpleNodeSampler extends NodeSampler {
402403
@Override
403404
protected void doSample() {
404405
HashSet<DiscoveryNode> newNodes = new HashSet<>();
405-
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
406+
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
406407
for (DiscoveryNode listedNode : listedNodes) {
407408
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
408409
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
409410
new FutureTransportResponseHandler<LivenessResponse>() {
410411
@Override
411-
public LivenessResponse newInstance() {
412-
return new LivenessResponse();
412+
public LivenessResponse read(StreamInput in) throws IOException {
413+
LivenessResponse response = new LivenessResponse();
414+
response.readFrom(in);
415+
return response;
413416
}
414417
});
415418
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
@@ -435,8 +438,8 @@ public LivenessResponse newInstance() {
435438
}
436439
}
437440

438-
nodes = validateNewNodes(newNodes);
439-
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
441+
nodes = establishNodeConnections(newNodes);
442+
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
440443
}
441444
}
442445

@@ -557,7 +560,7 @@ public void handleException(TransportException e) {
557560
}
558561
}
559562

560-
nodes = validateNewNodes(newNodes);
563+
nodes = establishNodeConnections(newNodes);
561564
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
562565
}
563566
}

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.client.Client;
25+
import org.elasticsearch.client.transport.TransportClient;
2426
import org.elasticsearch.core.internal.io.IOUtils;
2527
import org.elasticsearch.Version;
2628
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
@@ -124,6 +126,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
124126

125127
private final RemoteClusterService remoteClusterService;
126128

129+
private final boolean validateConnections;
130+
127131
/** if set will call requests sent to this id to shortcut and executed locally */
128132
volatile DiscoveryNode localNode = null;
129133
private final Transport.Connection localNodeConnection = new Transport.Connection() {
@@ -153,6 +157,9 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
153157
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
154158
Set<String> taskHeaders) {
155159
super(settings);
160+
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
161+
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
162+
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
156163
this.transport = transport;
157164
this.threadPool = threadPool;
158165
this.localNodeFactory = localNodeFactory;
@@ -314,6 +321,11 @@ public boolean nodeConnected(DiscoveryNode node) {
314321
return isLocalNode(node) || transport.nodeConnected(node);
315322
}
316323

324+
/**
325+
* Connect to the specified node with the default connection profile
326+
*
327+
* @param node the node to connect to
328+
*/
317329
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
318330
connectToNode(node, null);
319331
}
@@ -331,7 +343,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
331343
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
332344
// We don't validate cluster names to allow for tribe node connections.
333345
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
334-
if (node.equals(remote) == false) {
346+
if (validateConnections && node.equals(remote) == false) {
335347
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
336348
}
337349
});

server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.transport;
2121

2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.client.Client;
24+
import org.elasticsearch.client.transport.TransportClient;
2325
import org.elasticsearch.cluster.node.DiscoveryNode;
2426
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2527
import org.elasticsearch.common.network.NetworkService;
@@ -178,6 +180,42 @@ public void testNodeConnectWithDifferentNodeId() {
178180
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
179181
}
180182

183+
public void testNodeConnectWithDifferentNodeIdSucceedsIfThisIsTransportClientOfSimpleNodeSampler() {
184+
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
185+
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build();
186+
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
187+
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
188+
DiscoveryNode discoveryNode = new DiscoveryNode(
189+
randomAlphaOfLength(10),
190+
handleB.discoveryNode.getAddress(),
191+
emptyMap(),
192+
emptySet(),
193+
handleB.discoveryNode.getVersion());
194+
195+
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
196+
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
197+
}
198+
199+
public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() {
200+
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
201+
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
202+
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
203+
.build();
204+
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
205+
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
206+
DiscoveryNode discoveryNode = new DiscoveryNode(
207+
randomAlphaOfLength(10),
208+
handleB.discoveryNode.getAddress(),
209+
emptyMap(),
210+
emptySet(),
211+
handleB.discoveryNode.getVersion());
212+
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
213+
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
214+
});
215+
assertThat(ex.getMessage(), containsString("unexpected remote node"));
216+
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
217+
}
218+
181219

182220
private static class NetworkHandle {
183221
private TransportService transportService;

0 commit comments

Comments
 (0)