Skip to content

Commit c693f67

Browse files
committed
Ensure we don't use a remote profile if cluster name matches (#31331)
If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321
1 parent 32f70f9 commit c693f67

File tree

4 files changed

+154
-12
lines changed

4 files changed

+154
-12
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
9191
private volatile boolean skipUnavailable;
9292
private final ConnectHandler connectHandler;
9393
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
94+
private final ClusterName localClusterName;
9495

9596
/**
9697
* Creates a new {@link RemoteClusterConnection}
@@ -104,6 +105,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
104105
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
105106
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
106107
super(settings);
108+
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
107109
this.transportService = transportService;
108110
this.maxNumRemoteConnections = maxNumRemoteConnections;
109111
this.nodePredicate = nodePredicate;
@@ -314,6 +316,21 @@ public boolean isClosed() {
314316
return connectHandler.isClosed();
315317
}
316318

319+
private ConnectionProfile getRemoteProfile(ClusterName name) {
320+
// we can only compare the cluster name to make a decision if we should use a remote profile
321+
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
322+
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
323+
// rather smallish optimization on the connection layer under certain situations where remote clusters
324+
// have the same name as the local one is minor here.
325+
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
326+
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
327+
if (this.localClusterName.equals(name)) {
328+
return null;
329+
} else {
330+
return remoteProfile;
331+
}
332+
}
333+
317334
/**
318335
* The connect handler manages node discovery and the actual connect to the remote cluster.
319336
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -423,7 +440,6 @@ protected void doRun() {
423440
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
424441
}
425442
});
426-
427443
}
428444

429445
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
@@ -435,21 +451,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
435451
if (seedNodes.hasNext()) {
436452
cancellableThreads.executeIO(() -> {
437453
final DiscoveryNode seedNode = seedNodes.next();
438-
final DiscoveryNode handshakeNode;
454+
final TransportService.HandshakeResponse handshakeResponse;
439455
Transport.Connection connection = transportService.openConnection(seedNode,
440456
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
441457
boolean success = false;
442458
try {
443459
try {
444-
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
460+
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
445461
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
446462
} catch (IllegalStateException ex) {
447463
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
448464
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
449465
throw ex;
450466
}
467+
468+
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
451469
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
452-
transportService.connectToNode(handshakeNode, remoteProfile);
470+
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
471+
if (remoteClusterName.get() == null) {
472+
assert handshakeResponse.getClusterName().value() != null;
473+
remoteClusterName.set(handshakeResponse.getClusterName());
474+
}
453475
connectedNodes.add(handshakeNode);
454476
}
455477
ClusterStateRequest request = new ClusterStateRequest();
@@ -556,7 +578,8 @@ public void handleResponse(ClusterStateResponse response) {
556578
for (DiscoveryNode node : nodesIter) {
557579
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
558580
try {
559-
transportService.connectToNode(node, remoteProfile); // noop if node is connected
581+
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
582+
// connected
560583
connectedNodes.add(node);
561584
} catch (ConnectTransportException | IllegalStateException ex) {
562585
// ISE if we fail the handshake with an version incompatible node

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,8 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
341341
return;
342342
}
343343
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
344-
// We don't validate cluster names to allow for tribe node connections.
345-
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
344+
// We don't validate cluster names to allow for CCS connections.
345+
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
346346
if (validateConnections && node.equals(remote) == false) {
347347
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
348348
}
@@ -378,7 +378,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
378378
public DiscoveryNode handshake(
379379
final Transport.Connection connection,
380380
final long handshakeTimeout) throws ConnectTransportException {
381-
return handshake(connection, handshakeTimeout, clusterName::equals);
381+
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
382382
}
383383

384384
/**
@@ -390,11 +390,11 @@ public DiscoveryNode handshake(
390390
* @param connection the connection to a specific node
391391
* @param handshakeTimeout handshake timeout
392392
* @param clusterNamePredicate cluster name validation predicate
393-
* @return the connected node
393+
* @return the handshake response
394394
* @throws ConnectTransportException if the connection failed
395395
* @throws IllegalStateException if the handshake failed
396396
*/
397-
public DiscoveryNode handshake(
397+
public HandshakeResponse handshake(
398398
final Transport.Connection connection,
399399
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
400400
final HandshakeResponse response;
@@ -420,7 +420,7 @@ public HandshakeResponse newInstance() {
420420
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
421421
}
422422

423-
return response.discoveryNode;
423+
return response;
424424
}
425425

426426
static class HandshakeRequest extends TransportRequest {
@@ -461,6 +461,14 @@ public void writeTo(StreamOutput out) throws IOException {
461461
clusterName.writeTo(out);
462462
Version.writeVersion(version, out);
463463
}
464+
465+
public DiscoveryNode getDiscoveryNode() {
466+
return discoveryNode;
467+
}
468+
469+
public ClusterName getClusterName() {
470+
return clusterName;
471+
}
464472
}
465473

466474
public void disconnectFromNode(DiscoveryNode node) {

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,102 @@ public static MockTransportService startTransport(
149149
}
150150
}
151151

152+
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
153+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
154+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
155+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
156+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
157+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
158+
knownNodes.add(seedTransport.getLocalDiscoNode());
159+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
160+
Collections.shuffle(knownNodes, random());
161+
162+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
163+
service.start();
164+
service.acceptIncomingRequests();
165+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
166+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
167+
updateSeedNodes(connection, Arrays.asList(seedNode));
168+
assertTrue(service.nodeConnected(seedNode));
169+
assertTrue(service.nodeConnected(discoverableNode));
170+
assertTrue(connection.assertNoRunningConnections());
171+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
172+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
173+
@Override
174+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
175+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
176+
inst.readFrom(in);
177+
return inst;
178+
}
179+
});
180+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
181+
.build();
182+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
183+
options, futureHandler);
184+
futureHandler.txGet();
185+
}
186+
}
187+
}
188+
}
189+
190+
public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
191+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
192+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
193+
Settings.builder().put("cluster.name", "foobar").build());
194+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
195+
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
196+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
197+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
198+
knownNodes.add(seedTransport.getLocalDiscoNode());
199+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
200+
Collections.shuffle(knownNodes, random());
201+
202+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
203+
service.start();
204+
service.acceptIncomingRequests();
205+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
206+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
207+
updateSeedNodes(connection, Arrays.asList(seedNode));
208+
assertTrue(service.nodeConnected(seedNode));
209+
assertTrue(service.nodeConnected(discoverableNode));
210+
assertTrue(connection.assertNoRunningConnections());
211+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
212+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
213+
@Override
214+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
215+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
216+
inst.readFrom(in);
217+
return inst;
218+
}
219+
});
220+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
221+
.build();
222+
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
223+
service.sendRequest(discoverableNode,
224+
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
225+
futureHandler.txGet();
226+
}).getCause();
227+
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
228+
229+
PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
230+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
231+
@Override
232+
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
233+
ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse();
234+
inst.readFrom(in);
235+
return inst;
236+
}
237+
});
238+
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
239+
.build();
240+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
241+
ops, handler);
242+
handler.txGet();
243+
}
244+
}
245+
}
246+
}
247+
152248
public void testDiscoverSingleNode() throws Exception {
153249
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
154250
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,22 @@ protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
198198
@Override
199199
protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
200200
ConnectionProfile connectionProfile1 = resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
201-
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(LIGHT_PROFILE);
201+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
202+
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
203+
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
204+
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
205+
Set<TransportRequestOptions.Type> types = handle.getTypes();
206+
if (handle.length > 0) {
207+
allTypesWithConnection.addAll(types);
208+
} else {
209+
allTypesWithoutConnection.addAll(types);
210+
}
211+
}
212+
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
213+
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
214+
if (allTypesWithoutConnection.isEmpty() == false) {
215+
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
216+
}
202217
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
203218
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
204219
return builder.build();

0 commit comments

Comments
 (0)