Skip to content

Commit ba06c14

Browse files
authored
TransportService.connectToNode should validate remote node ID (#22828)
#22194 gave us the ability to open low level temporary connections to remote node based on their address. With this use case out of the way, actual full blown connections should validate the node on the other side, making sure we speak to who we think we speak to. This helps in case where multiple nodes are started on the same host and a quick node restart causes them to swap addresses, which in turn can cause confusion down the road.
1 parent 245aa04 commit ba06c14

File tree

28 files changed

+590
-280
lines changed

28 files changed

+590
-280
lines changed

core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 96 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.apache.logging.log4j.util.Supplier;
25+
import org.apache.lucene.util.IOUtils;
2526
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.Version;
2728
import org.elasticsearch.action.ActionListener;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.common.settings.Settings;
3940
import org.elasticsearch.common.transport.TransportAddress;
4041
import org.elasticsearch.common.unit.TimeValue;
42+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4143
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4244
import org.elasticsearch.common.util.concurrent.FutureUtils;
4345
import org.elasticsearch.threadpool.ThreadPool;
@@ -46,6 +48,8 @@
4648
import org.elasticsearch.transport.FutureTransportResponseHandler;
4749
import org.elasticsearch.transport.NodeDisconnectedException;
4850
import org.elasticsearch.transport.NodeNotConnectedException;
51+
import org.elasticsearch.transport.PlainTransportFuture;
52+
import org.elasticsearch.transport.Transport;
4953
import org.elasticsearch.transport.TransportException;
5054
import org.elasticsearch.transport.TransportRequestOptions;
5155
import org.elasticsearch.transport.TransportResponseHandler;
@@ -401,51 +405,37 @@ protected void doSample() {
401405
HashSet<DiscoveryNode> newNodes = new HashSet<>();
402406
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
403407
for (DiscoveryNode listedNode : listedNodes) {
404-
if (!transportService.nodeConnected(listedNode)) {
405-
try {
406-
// its a listed node, light connect to it...
407-
logger.trace("connecting to listed node [{}]", listedNode);
408-
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
409-
} catch (Exception e) {
410-
logger.info(
411-
(Supplier<?>)
412-
() -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e);
413-
hostFailureListener.onNodeDisconnected(listedNode, e);
414-
newFilteredNodes.add(listedNode);
415-
continue;
416-
}
417-
}
418-
try {
419-
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
420-
new LivenessRequest(),
421-
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
422-
new FutureTransportResponseHandler<LivenessResponse>() {
423-
@Override
424-
public LivenessResponse newInstance() {
425-
return new LivenessResponse();
426-
}
427-
}).txGet();
408+
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
409+
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
410+
new FutureTransportResponseHandler<LivenessResponse>() {
411+
@Override
412+
public LivenessResponse newInstance() {
413+
return new LivenessResponse();
414+
}
415+
});
416+
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
417+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
418+
handler);
419+
final LivenessResponse livenessResponse = handler.txGet();
428420
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
429421
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
430422
newFilteredNodes.add(listedNode);
431-
} else if (livenessResponse.getDiscoveryNode() != null) {
423+
} else {
432424
// use discovered information but do keep the original transport address,
433425
// so people can control which address is exactly used.
434426
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
435427
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
436428
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
437429
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
438-
} else {
439-
// although we asked for one node, our target may not have completed
440-
// initialization yet and doesn't have cluster nodes
441-
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
442-
newNodes.add(listedNode);
443430
}
431+
} catch (ConnectTransportException e) {
432+
logger.debug(
433+
(Supplier<?>)
434+
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
435+
hostFailureListener.onNodeDisconnected(listedNode, e);
444436
} catch (Exception e) {
445437
logger.info(
446438
(Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
447-
transportService.disconnectFromNode(listedNode);
448-
hostFailureListener.onNodeDisconnected(listedNode, e);
449439
}
450440
}
451441

@@ -470,78 +460,91 @@ protected void doSample() {
470460

471461
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
472462
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
473-
for (final DiscoveryNode listedNode : nodesToPing) {
474-
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
475-
@Override
476-
public void run() {
477-
try {
478-
if (!transportService.nodeConnected(listedNode)) {
479-
try {
463+
try {
464+
for (final DiscoveryNode nodeToPing : nodesToPing) {
465+
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
466+
467+
/**
468+
* we try to reuse existing connections but if needed we will open a temporary connection
469+
* that will be closed at the end of the execution.
470+
*/
471+
Transport.Connection connectionToClose = null;
472+
473+
@Override
474+
public void onAfter() {
475+
IOUtils.closeWhileHandlingException(connectionToClose);
476+
}
480477

481-
// if its one of the actual nodes we will talk to, not to listed nodes, fully connect
482-
if (nodes.contains(listedNode)) {
483-
logger.trace("connecting to cluster node [{}]", listedNode);
484-
transportService.connectToNode(listedNode);
485-
} else {
486-
// its a listed node, light connect to it...
487-
logger.trace("connecting to listed node (light) [{}]", listedNode);
488-
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
489-
}
490-
} catch (Exception e) {
491-
logger.debug(
492-
(Supplier<?>)
493-
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
494-
latch.countDown();
495-
return;
478+
@Override
479+
public void onFailure(Exception e) {
480+
latch.countDown();
481+
if (e instanceof ConnectTransportException) {
482+
logger.debug((Supplier<?>)
483+
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
484+
hostFailureListener.onNodeDisconnected(nodeToPing, e);
485+
} else {
486+
logger.info(
487+
(Supplier<?>) () -> new ParameterizedMessage(
488+
"failed to get local cluster state info for {}, disconnecting...", nodeToPing), e);
489+
}
490+
}
491+
492+
@Override
493+
protected void doRun() throws Exception {
494+
Transport.Connection pingConnection = null;
495+
if (nodes.contains(nodeToPing)) {
496+
try {
497+
pingConnection = transportService.getConnection(nodeToPing);
498+
} catch (NodeNotConnectedException e) {
499+
// will use a temp connection
496500
}
497501
}
498-
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
499-
Requests.clusterStateRequest().clear().nodes(true).local(true),
500-
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
501-
.withTimeout(pingTimeout).build(),
502-
new TransportResponseHandler<ClusterStateResponse>() {
503-
504-
@Override
505-
public ClusterStateResponse newInstance() {
506-
return new ClusterStateResponse();
507-
}
502+
if (pingConnection == null) {
503+
logger.trace("connecting to cluster node [{}]", nodeToPing);
504+
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
505+
pingConnection = connectionToClose;
506+
}
507+
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
508+
Requests.clusterStateRequest().clear().nodes(true).local(true),
509+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
510+
.withTimeout(pingTimeout).build(),
511+
new TransportResponseHandler<ClusterStateResponse>() {
512+
513+
@Override
514+
public ClusterStateResponse newInstance() {
515+
return new ClusterStateResponse();
516+
}
508517

509-
@Override
510-
public String executor() {
511-
return ThreadPool.Names.SAME;
512-
}
518+
@Override
519+
public String executor() {
520+
return ThreadPool.Names.SAME;
521+
}
513522

514-
@Override
515-
public void handleResponse(ClusterStateResponse response) {
516-
clusterStateResponses.put(listedNode, response);
517-
latch.countDown();
518-
}
523+
@Override
524+
public void handleResponse(ClusterStateResponse response) {
525+
clusterStateResponses.put(nodeToPing, response);
526+
latch.countDown();
527+
}
519528

520-
@Override
521-
public void handleException(TransportException e) {
522-
logger.info(
523-
(Supplier<?>) () -> new ParameterizedMessage(
524-
"failed to get local cluster state for {}, disconnecting...", listedNode), e);
525-
transportService.disconnectFromNode(listedNode);
529+
@Override
530+
public void handleException(TransportException e) {
531+
logger.info(
532+
(Supplier<?>) () -> new ParameterizedMessage(
533+
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
534+
try {
535+
hostFailureListener.onNodeDisconnected(nodeToPing, e);
536+
}
537+
finally {
526538
latch.countDown();
527-
hostFailureListener.onNodeDisconnected(listedNode, e);
528539
}
529-
});
530-
} catch (Exception e) {
531-
logger.info(
532-
(Supplier<?>)() -> new ParameterizedMessage(
533-
"failed to get local cluster state info for {}, disconnecting...", listedNode), e);
534-
transportService.disconnectFromNode(listedNode);
535-
latch.countDown();
536-
hostFailureListener.onNodeDisconnected(listedNode, e);
540+
}
541+
});
537542
}
538-
}
539-
});
540-
}
541-
542-
try {
543+
});
544+
}
543545
latch.await();
544546
} catch (InterruptedException e) {
547+
Thread.currentThread().interrupt();
545548
return;
546549
}
547550

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -191,18 +191,24 @@ public DiscoveryNode(String nodeName, String nodeId, String ephemeralId, String
191191
/** Creates a DiscoveryNode representing the local node. */
192192
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) {
193193
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(settings).getAsMap());
194-
Set<DiscoveryNode.Role> roles = new HashSet<>();
194+
Set<Role> roles = getRolesFromSettings(settings);
195+
196+
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
197+
}
198+
199+
/** extract node roles from the given settings */
200+
public static Set<Role> getRolesFromSettings(Settings settings) {
201+
Set<Role> roles = new HashSet<>();
195202
if (Node.NODE_INGEST_SETTING.get(settings)) {
196-
roles.add(DiscoveryNode.Role.INGEST);
203+
roles.add(Role.INGEST);
197204
}
198205
if (Node.NODE_MASTER_SETTING.get(settings)) {
199-
roles.add(DiscoveryNode.Role.MASTER);
206+
roles.add(Role.MASTER);
200207
}
201208
if (Node.NODE_DATA_SETTING.get(settings)) {
202-
roles.add(DiscoveryNode.Role.DATA);
209+
roles.add(Role.DATA);
203210
}
204-
205-
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
211+
return roles;
206212
}
207213

208214
/**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common;
21+
22+
import java.util.function.BiConsumer;
23+
24+
/**
25+
* A {@link BiConsumer}-like interface which allows throwing checked exceptions.
26+
*/
27+
@FunctionalInterface
28+
public interface CheckedBiConsumer<T, U, E extends Exception> {
29+
void accept(T t, U u) throws E;
30+
}

0 commit comments

Comments
 (0)